-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathenviro-pi.py
315 lines (283 loc) · 12.3 KB
/
enviro-pi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
#!/usr/bin/python
# -*- coding: utf-8 -*-
from __future__ import print_function
print("Loading Wait ...")
import os
PROG_VER = "2.3"
PROG_NAME = os.path.basename(__file__)
HORIZ_LINE = "----------------------------------------------------------------------"
print(HORIZ_LINE)
print("%s ver %s written by Pavol Odlevak and Claude Pageau" % (PROG_NAME, PROG_VER))
print("\nRead/Save SenseHat Humidity, Temperature and Barometric Pressure data to a sqlite3 database.")
print("Run webserver.py to View Data History graphs via Web Browser.\n")
print("Optional: Upload data to the Weather Underground Personal Weather Station (PWS)")
print(HORIZ_LINE)
import time
import datetime
import sys
import logging
try:
import requests
except ImportError:
import pip._vendor.requests as requests
try:
import sqlite3 as lite
except ImportError:
print("Install sqlite3 Python library per below. Then retry")
print("sudo apt install sqlite3")
sys.exit(1)
# Get information about this script including name, launch path, etc.
# This allows script to be renamed or relocated to another directory
mypath = os.path.abspath(__file__) # Find the full path of this python script
# get the path location only (excluding script name)
base_dir = mypath[0:mypath.rfind("/")+1]
# Read configuration settings from file
config_file_path = os.path.join(base_dir, "config.py")
if os.path.exists(config_file_path):
# Read Configuration variables from config.py file
try:
from config import *
except ImportError:
print("WARN : Problem reading configuration variables from %s" % config_file_path)
sys.exit(1)
else:
print("ERROR: Missing config.py file - File Not Found %s" % config_file_path)
print("Please investigate problem. Exiting %s ver %s" % (PROG_NAME, PROG_VER))
sys.exit(1)
if LOGGING_ON:
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(funcName)-10s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
else:
print("WARNING: Logging Disabled per LOGGING_ON = %s" % LOGGING_ON)
# Check that STATION_UPLOAD_MINUTES is not None and less than 60
if (STATION_UPLOAD_MINUTES is None) or (STATION_UPLOAD_MINUTES > 60):
logging.error("STATION_UPLOAD_MINUTES= %i Cannot be empty or greater than 60 minutes",
STATION_UPLOAD_MINUTES)
sys.exit(1)
# Load SenseHat and Check for Errors
try:
from sense_hat import SenseHat
except ImportError:
print("Install Sense Hat Python library per below. Then retry")
print("sudo apt install sense-hat")
sys.exit(1)
try:
sense = SenseHat()
except OSError as err_msg:
print("ERROR - Problem accessing Sense Hat ...")
print(" %s" % err_msg)
print("Please investigate problem. Exiting %s ver %s" % (PROG_NAME, PROG_VER))
sys.exit(1)
# Create path to Sqlite3 Database File
sqlite3_db_path = os.path.join(SQLITE3_DB_DIR, SQLITE3_DB_NAME)
# constants used to display an up and down arrows plus bars and hourglass
# modified from https://www.raspberrypi.org/learning/getting-started-with-the-sense-hat/worksheet/
# set up the colours (blue, green, red, empty)
b = [0, 0, 255] # blue
g = [0, 255, 0] # green
r = [255, 0, 0] # red
e = [0, 0, 0] # empty
# create images for up and down arrows
arrow_up = [
e, e, e, r, r, e, e, e,
e, e, r, r, r, r, e, e,
e, r, e, r, r, e, r, e,
r, e, e, r, r, e, e, r,
e, e, e, r, r, e, e, e,
e, e, e, r, r, e, e, e,
e, e, e, r, r, e, e, e,
e, e, e, r, r, e, e, e
]
arrow_down = [
e, e, e, b, b, e, e, e,
e, e, e, b, b, e, e, e,
e, e, e, b, b, e, e, e,
e, e, e, b, b, e, e, e,
b, e, e, b, b, e, e, b,
e, b, e, b, b, e, b, e,
e, e, b, b, b, b, e, e,
e, e, e, b, b, e, e, e
]
bars = [
e, e, e, e, e, e, e, e,
e, e, e, e, e, e, e, e,
r, r, r, r, r, r, r, r,
r, r, r, r, r, r, r, r,
b, b, b, b, b, b, b, b,
b, b, b, b, b, b, b, b,
e, e, e, e, e, e, e, e,
e, e, e, e, e, e, e, e
]
hourglass = [
b, b, b, b, b, b, b, b,
e, b, b, b, b, b, b, e,
e, e, b, b, b, b, e, e,
e, e, e, b, b, e, e, e,
e, e, e, b, b, e, e, e,
e, e, b, b, b, b, e, e,
e, b, b, b, b, b, b, e,
b, b, b, b, b, b, b, b
]
def init_db():
"""Connects to the specific database."""
con = lite.connect(sqlite3_db_path)
cur = con.cursor()
query = """CREATE TABLE IF NOT EXISTS
sensehat(epoch INT,
humidity REAL,
pressure REAL,
temp_hum REAL,
temp_prs REAL)"""
cur.execute(query)
if not os.path.exists(sqlite3_db_path):
print("ERROR: Could Not Find sqlite3 database at %s" % sqlite3_db_path)
print("Please Investigate Problem. Exiting %s ver %s" % (PROG_NAME, PROG_VER))
sys.exit(1)
def c_to_f(input_temp):
# convert input_temp from Celsius to Fahrenheit
return (input_temp * 1.8) + 32
def get_cpu_temp():
# 'borrowed' from https://www.raspberrypi.org/forums/viewtopic.php?f=104&t=111457
# executes a command at the OS to pull in the CPU temperature
res = os.popen('vcgencmd measure_temp').readline()
return float(res.replace("temp=", "").replace("'C\n", ""))
def get_temp():
# get temp readings from sensor
temp_c = sense.get_temperature()
return temp_c - SENSEHAT_TEMP_OFFSET
def main():
if SENSEHAT_SCREEN_ON:
sense.set_pixels(hourglass)
if SQLITE3_DB_ON:
init_db() # Initialize the sqlite database
con = lite.connect(sqlite3_db_path)
last_temp = round(c_to_f(get_temp()), 1)
# initialize the lastMinute variable to current time to start
last_minute = datetime.datetime.now().minute
# on startup, just use the previous minute as lastMinute
last_minute -= 1
if last_minute == 0:
last_minute = 59
if STATION_UPLOAD_ON:
mode = "Upload"
else:
mode = "Reading"
logging.info("Next %s in %i minutes. Waiting ...", mode, STATION_UPLOAD_MINUTES)
# infinite loop to continuously Read/Upload weather values
while True:
# The temp measurement smoothing algorithm's accuracy is based
# on frequent measurements, so we'll take measurements every 10 seconds
# but only upload on STATION_UPLOAD_MINUTES
current_second = datetime.datetime.now().second
# are we at the top of the minute or at a 10 second interval?
if (current_second == 0) or ((current_second % 10) == 0):
# ========================================================
# read values from the Sense HAT
# ========================================================
# now use it for our purposes
temp_c = round(get_temp(), 1)
temp_f = round(c_to_f(get_temp()), 1)
humidity = round(sense.get_humidity(), 0)
# convert pressure from millibars to inHg before posting
pressure_inHg = round(sense.get_pressure() * 0.0295300, 1)
pressure = round(sense.get_pressure(), 1) # hpa
# get the current minute
current_minute = datetime.datetime.now().minute
# is it the same minute as the last time we checked?
if current_minute != last_minute:
# reset last_minute to the current_minute
last_minute = current_minute
# is minute zero, or divisible by 10?
# we're only going to take measurements every STATION_UPLOAD_MINUTES minutes
if (current_minute == 0) or ((current_minute % STATION_UPLOAD_MINUTES) == 0):
# Update sqlite3 database
logging.info(" READING: Temp: %sF (%sC), Press: %s hPa, Hum: %s%%",
temp_f, temp_c, pressure, humidity)
if SQLITE3_DB_ON:
logging.info(" SQLITE3: INSERT Data INTO table sensehat at %s", sqlite3_db_path)
with con:
cur = con.cursor()
command = "INSERT INTO sensehat VALUES(%i, %0.2f, %0.2f, %0.2f, %0.2f)" % (
int(time.time()), humidity, pressure, temp_c, temp_c)
try:
cur.execute(command)
except Exception as err:
logging.error("DB Update Failed: %s\nError: %s", command, str(err))
else:
logging.warning(" SQLITE3: DB DISABLED per SQLITE3_DB_ON = %s", SQLITE3_DB_ON)
now = datetime.datetime.now()
# did the temperature go up or down?
if SENSEHAT_SCREEN_ON:
if temp_f == last_temp:
# temperature stayed the same
# display red and blue bars
sense.set_pixels(bars)
elif temp_f > last_temp:
# display a red, up arrow
sense.set_pixels(arrow_up)
else:
# display a blue, down arrow
sense.set_pixels(arrow_down)
# set last_temp to the current temperature before we measure again
last_temp = temp_f
# ========================================================
# Upload the weather data to Weather Underground
# ========================================================
# is weather upload enabled (True)?
if STATION_UPLOAD_ON:
# From http://wiki.wunderground.com/index.php/PWS_-_Upload_Protocol
# build a weather data object
weather_data = {"action": "updateraw",
"ID": STATION_ID,
"PASSWORD": STATION_KEY,
"dateutc": "now",
"tempf": str(temp_f),
"humidity": str(humidity),
"baromin": str(pressure),
}
try:
logging.info(" CONNECT: Station ID %s", STATION_ID)
response = requests.get(STATION_WU_URL, weather_data)
html_status = response.text.upper().rstrip('\n')
logging.info(" UPLOAD : %s", html_status)
if SENSEHAT_SCREEN_ON:
if html_status == "SUCCESS":
sense.set_pixel(0, 0, g)
else:
sense.set_pixel(0, 0, r)
except Exception as err:
logging.warning(" %s",err)
else:
logging.warning(" UPLOAD: Disabled per STATION_UPLOAD_ON = %s",
STATION_UPLOAD_ON)
logging.info("Next %s in %i minutes. Waiting ...",
mode, STATION_UPLOAD_MINUTES)
if __name__ == "__main__":
########################################
# Check and Initialize program variables
########################################
try:
logging.info("CONNECT: senseHat ....")
sense = SenseHat()
sense.set_rotation(SENSEHAT_SCREEN_ROTATE)
# then write some text to the Sense HAT's 'screen'
sense.show_message(SENSEHAT_INIT_MSG, text_colour=[255, 255, 0], back_colour=[0, 0, 255])
# clear the screen
sense.clear()
# get the current temp to use when checking the previous measurement
last_temp = round(c_to_f(get_temp()), 1)
except Exception as err:
logging.error("CONNECT: %s", err)
sys.exit(1)
logging.info("READING: SUCCESS SenseHat OK. Temp is %dF (%dC)", last_temp, get_temp())
try:
main()
except KeyboardInterrupt:
sense.show_message("BYE", text_colour=[255, 255, 0], back_colour=[0, 0, 255])
time.sleep(2)
sense.clear()
print("\nUser Exited with Ctrl-c")
print("Bye ....")
sys.exit()