Skip to content

Commit

Permalink
0.8.6
Browse files Browse the repository at this point in the history
**Added**
- configurable color options for the navbar icons.
- countdown timer that utilizes the backlog command with delay to allow for powering off a plug after the pi has been shutdown using the system command option.
- polling option to check status based on configured interval.
  • Loading branch information
jneilliii authored Aug 11, 2019
1 parent a9cc240 commit 1b8b068
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 169 deletions.
104 changes: 61 additions & 43 deletions octoprint_tasmota/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@
import threading

class tasmotaPlugin(octoprint.plugin.SettingsPlugin,
octoprint.plugin.AssetPlugin,
octoprint.plugin.TemplatePlugin,
octoprint.plugin.AssetPlugin,
octoprint.plugin.TemplatePlugin,
octoprint.plugin.SimpleApiPlugin,
octoprint.plugin.StartupPlugin):

def __init__(self):
self._logger = logging.getLogger("octoprint.plugins.tasmota")
self._tasmota_logger = logging.getLogger("octoprint.plugins.tasmota.debug")

##~~ StartupPlugin mixin

def on_startup(self, host, port):
# setup customized logger
from octoprint.logging.handlers import CleaningTimedRotatingFileHandler
Expand All @@ -34,20 +34,22 @@ def on_startup(self, host, port):
self._tasmota_logger.addHandler(tasmota_logging_handler)
self._tasmota_logger.setLevel(logging.DEBUG if self._settings.get_boolean(["debug_logging"]) else logging.INFO)
self._tasmota_logger.propagate = False

def on_after_startup(self):
self._logger.info("Tasmota loaded!")

##~~ SettingsPlugin mixin

def get_settings_defaults(self):
return dict(
singleRelay = True,
debug_logging = False,
arrSmartplugs = [{'ip':'','displayWarning':True,'idx':'1','warnPrinting':False,'gcodeEnabled':False,'gcodeOnDelay':0,'gcodeOffDelay':0,'autoConnect':True,'autoConnectDelay':10.0,'autoDisconnect':True,'autoDisconnectDelay':0,'sysCmdOn':False,'sysRunCmdOn':'','sysCmdOnDelay':0,'sysCmdOff':False,'sysRunCmdOff':'','sysCmdOffDelay':0,'currentState':'unknown','btnColor':'#808080','username':'admin','password':'','icon':'icon-bolt','label':''}],
polling_enabled = False,
polling_interval = 0,
arrSmartplugs = [{'ip':'','displayWarning':True,'idx':'1','warnPrinting':False,'gcodeEnabled':False,'gcodeOnDelay':0,'gcodeOffDelay':0,'autoConnect':True,'autoConnectDelay':10.0,'autoDisconnect':True,'autoDisconnectDelay':0,'sysCmdOn':False,'sysRunCmdOn':'','sysCmdOnDelay':0,'sysCmdOff':False,'sysRunCmdOff':'','sysCmdOffDelay':0,'currentState':'unknown','username':'admin','password':'','icon':'icon-bolt','label':'','on_color':'#00FF00','off_color':'#FF0000','unknown_color':'#808080','use_backlog':False,'backlog_on_delay':0,'backlog_off_delay':0}],
)
def on_settings_save(self, data):

def on_settings_save(self, data):
old_debug_logging = self._settings.get_boolean(["debug_logging"])

octoprint.plugin.SettingsPlugin.on_settings_save(self, data)
Expand All @@ -58,47 +60,52 @@ def on_settings_save(self, data):
self._tasmota_logger.setLevel(logging.DEBUG)
else:
self._tasmota_logger.setLevel(logging.INFO)

def get_settings_version(self):
return 3
return 4

def on_settings_migrate(self, target, current=None):
if current is None or current < self.get_settings_version():
# Reset plug settings to defaults.
self._logger.debug("Resetting arrSmartplugs for tasmota settings.")
self._settings.set(['arrSmartplugs'], self.get_settings_defaults()["arrSmartplugs"])

##~~ AssetPlugin mixin

def get_assets(self):
return dict(
js=["js/tasmota.js"],
css=["css/tasmota.css"]
)

##~~ TemplatePlugin mixin

def get_template_configs(self):
return [
dict(type="navbar", custom_bindings=True),
dict(type="settings", custom_bindings=True)
]

##~~ SimpleApiPlugin mixin
def turn_on(self, plugip, plugidx, username="admin", password=""):

def turn_on(self, plugip, plugidx, username="admin", password="", backlog_delay=0):
if self._settings.get(['singleRelay']):
plugidx = ''
self._tasmota_logger.debug("Turning on %s index %s." % (plugip, plugidx))
try:
webresponse = urllib2.urlopen("http://" + plugip + "/cm?user=" + username + "&password=" + password + "&cmnd=Power" + str(plugidx) + "%20on").read()
response = json.loads(webresponse)
if int(backlog_delay) > 0:
webresponse = urllib2.urlopen("http://" + plugip + "/cm?user=" + username + "&password=" + password + "&cmnd=backlog%20delay%20" + str(int(backlog_delay)*10) + "%3BPower" + str(plugidx) + "%20on%3B").read()
response = dict()
response["POWER%s" % plugidx] = "ON"
else:
webresponse = urllib2.urlopen("http://" + plugip + "/cm?user=" + username + "&password=" + password + "&cmnd=Power" + str(plugidx) + "%20on").read()
response = json.loads(webresponse)
chk = response["POWER%s" % plugidx]
except:
except:
self._tasmota_logger.error('Invalid ip or unknown error connecting to %s.' % plugip, exc_info=True)
response = "Unknown error turning on %s index %s." % (plugip, plugidx)
chk = "UNKNOWN"

self._tasmota_logger.debug("Response: %s" % response)
if self._settings.get(['singleRelay']):
plugidx = '1'
Expand All @@ -109,20 +116,25 @@ def turn_on(self, plugip, plugidx, username="admin", password=""):
else:
self._tasmota_logger.debug(response)
self._plugin_manager.send_plugin_message(self._identifier, dict(currentState="unknown",ip=plugip,idx=plugidx))
def turn_off(self, plugip, plugidx, username="admin", password=""):

def turn_off(self, plugip, plugidx, username="admin", password="", backlog_delay=0):
if self._settings.get(['singleRelay']):
plugidx = ''
self._tasmota_logger.debug("Turning off %s index %s." % (plugip, plugidx))
try:
webresponse = urllib2.urlopen("http://" + plugip + "/cm?user=" + username + "&password=" + password + "&cmnd=Power" + str(plugidx) + "%20off").read()
response = json.loads(webresponse)
if int(backlog_delay) > 0:
webresponse = urllib2.urlopen("http://" + plugip + "/cm?user=" + username + "&password=" + password + "&cmnd=backlog%20delay%20" + str(int(backlog_delay)*10) + "%3BPower" + str(plugidx) + "%20off%3B").read()
response = dict()
response["POWER%s" % plugidx] = "OFF"
else:
webresponse = urllib2.urlopen("http://" + plugip + "/cm?user=" + username + "&password=" + password + "&cmnd=Power" + str(plugidx) + "%20off").read()
response = json.loads(webresponse)
chk = response["POWER%s" % plugidx]
except:
self._tasmota_logger.error('Invalid ip or unknown error connecting to %s.' % plugip, exc_info=True)
response = "Unknown error turning off %s index %s." % (plugip, plugidx)
chk = "UNKNOWN"

self._tasmota_logger.debug("Response: %s" % response)
if self._settings.get(['singleRelay']):
plugidx = '1'
Expand All @@ -133,7 +145,7 @@ def turn_off(self, plugip, plugidx, username="admin", password=""):
else:
self._tasmota_logger.debug(response)
self._plugin_manager.send_plugin_message(self._identifier, dict(currentState="unknown",ip=plugip,idx=plugidx))

def check_status(self, plugip, plugidx, username="admin", password=""):
if self._settings.get(['singleRelay']):
plugidx = ''
Expand All @@ -148,7 +160,7 @@ def check_status(self, plugip, plugidx, username="admin", password=""):
self._tasmota_logger.error('Invalid ip or unknown error connecting to %s.' % plugip, exc_info=True)
response = "unknown error with %s." % plugip
chk = "UNKNOWN"

self._tasmota_logger.debug("%s index %s is %s" % (plugip, plugidx, chk))
if self._settings.get(['singleRelay']):
plugidx = '1'
Expand All @@ -158,8 +170,8 @@ def check_status(self, plugip, plugidx, username="admin", password=""):
self._plugin_manager.send_plugin_message(self._identifier, dict(currentState="off",ip=plugip,idx=plugidx))
else:
self._tasmota_logger.debug(response)
self._plugin_manager.send_plugin_message(self._identifier, dict(currentState="unknown",ip=plugip,idx=plugidx))
self._plugin_manager.send_plugin_message(self._identifier, dict(currentState="unknown",ip=plugip,idx=plugidx))

def get_api_commands(self):
return dict(turnOn=["ip","idx"],turnOff=["ip","idx"],checkStatus=["ip","idx"],connectPrinter=[],disconnectPrinter=[],sysCommand=["cmd"])

Expand All @@ -168,18 +180,18 @@ def on_api_command(self, command, data):
if not user_permission.can():
from flask import make_response
return make_response("Insufficient rights", 403)

if command == 'turnOn':
if "username" in data and data["username"] != "":
self._tasmota_logger.debug("Using authentication for %s." % "{ip}".format(**data))
self.turn_on("{ip}".format(**data),"{idx}".format(**data),username="{username}".format(**data),password="{password}".format(**data))
self.turn_on("{ip}".format(**data),"{idx}".format(**data),username="{username}".format(**data),password="{password}".format(**data),backlog_delay="{backlog_delay}".format(**data))
else:
self.turn_on("{ip}".format(**data),"{idx}".format(**data))
elif command == 'turnOff':
if "username" in data and data["username"] != "":
self._tasmota_logger.debug("Using authentication for %s." % "{ip}".format(**data))
self.turn_off("{ip}".format(**data),"{idx}".format(**data),username="{username}".format(**data),password="{password}".format(**data))
else:
self.turn_off("{ip}".format(**data),"{idx}".format(**data),username="{username}".format(**data),password="{password}".format(**data),backlog_delay="{backlog_delay}".format(**data))
else:
self.turn_off("{ip}".format(**data),"{idx}".format(**data))
elif command == 'checkStatus':
if "username" in data and data["username"] != "":
Expand All @@ -196,30 +208,36 @@ def on_api_command(self, command, data):
elif command == 'sysCommand':
self._tasmota_logger.debug("Running system command %s." % "{cmd}".format(**data))
os.system("{cmd}".format(**data))

##~~ Gcode processing hook

def processGCODE(self, comm_instance, phase, cmd, cmd_type, gcode, *args, **kwargs):
if gcode:
if cmd.startswith("M8") and cmd.count(" ") >= 2:
plugip = cmd.split()[1]
plugidx = cmd.split()[2]
plugidx = cmd.split()[2]
for plug in self._settings.get(["arrSmartplugs"]):
if plug["ip"].upper() == plugip.upper() and plug["idx"] == plugidx and plug["gcodeEnabled"]:
if cmd.startswith("M80"):
t = threading.Timer(int(plug["gcodeOnDelay"]),self.turn_on, [plug["ip"],plug["idx"]],{'username': plug["username"],'password': plug["password"]})
if plug["sysCmdOn"] and plug["sysRunCmdOn"] != "":
s = threading.Timer(int(plug["sysCmdOnDelay"]), os.system, [plug["sysRunCmdOn"]])
s.start()
t = threading.Timer(int(plug["gcodeOnDelay"]),self.turn_on, [plug["ip"],plug["idx"]],{'username': plug["username"],'password': plug["password"],'backlog_delay': plug["backlog_on_delay"]})
t.start()
self._tasmota_logger.debug("Received M80 command, attempting power on of %s index %s." % (plugip,plugidx))
return
elif cmd.startswith("M81"):
t = threading.Timer(int(plug["gcodeOffDelay"]),self.turn_off, [plug["ip"],plug["idx"]],{'username': plug["username"],'password': plug["password"]})
if plug["sysCmdOff"] and plug["sysRunCmdOff"] != "":
s = threading.Timer(int(plug["sysCmdOffDelay"]), os.system, [plug["sysRunCmdOff"]])
s.start()
t = threading.Timer(int(plug["gcodeOffDelay"]),self.turn_off, [plug["ip"],plug["idx"]],{'username': plug["username"],'password': plug["password"],'backlog_delay': plug["backlog_off_delay"]})
t.start()
self._tasmota_logger.debug("Received M81 command, attempting power off of %s index %s." % (plugip,plugidx))
return
else:
return
return


##~~ Softwareupdate hook

Expand Down
9 changes: 7 additions & 2 deletions octoprint_tasmota/static/css/tasmota.css
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#navbar_plugin_tasmota > a > i.on {
/* #navbar_plugin_tasmota > a > i.on {
color: #00FF00 !important;
}
Expand All @@ -8,12 +8,17 @@
#navbar_plugin_tasmota > a > i.unknown {
color: #808080 !important;
}
} */

#navbar_plugin_tasmota > a > div.tasmota_label {
display: none;
}

#navbar_plugin_tasmota > a > i.unknown::after {
content: " ?";
font-family: "Helvetica Neue",Helvetica,Arial,sans-serif;
} }

/* TouchUI - Show Label */
#touch #navbar_plugin_tasmota > a > div.tasmota_label {
padding-left: 5px;
Expand Down
Loading

0 comments on commit 1b8b068

Please sign in to comment.