-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
227 lines (175 loc) · 6.33 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import datetime
import json
import os
from flask import Flask, request, Response
from flask_mqtt import Mqtt
import psycopg2
########
# Conf #
########
HISTORY_LENGTH = 10
MQTT_URL = os.environ.get("MQTT_URL", "mqtt.svc.cave.avaruuskerho.fi")
DB_URL = os.environ.get("DB_URL", "postgres://postgres@localhost:5432")
############
# Conf end #
############
LOG_LEVELS = {
1: "INFO",
2: "NOTICE",
4: "WARNING",
8: "ERROR",
16: "DEBUG"
}
# Dictionary that the activity gets stored in
activity = {}
app = Flask(__name__)
app.config['MQTT_BROKER_URL'] = MQTT_URL
app.config['MQTT_BROKER_PORT'] = 1883 # default port for non-tls connection
app.config['MQTT_USERNAME'] = None # set the username here if you need authentication for the broker
app.config['MQTT_PASSWORD'] = None # set the password here if the broker demands authentication
app.config['MQTT_KEEPALIVE'] = 5 # set the time interval for sending a ping to the broker to 5 seconds
app.config['MQTT_TLS_ENABLED'] = False # set TLS to disabled for testing purposes
mqtt = Mqtt(app)
conn = psycopg2.connect(DB_URL)
cursor = conn.cursor()
def add_activity(id: str) -> None:
"""
Record activity from the given sensor with the current date/time
:param id: ID of the sensor that registered activity
"""
cursor.execute("INSERT INTO sensor_events (sensor, date) VALUES (%s, %s);",
(id, datetime.datetime.utcnow()))
conn.commit()
@mqtt.on_connect()
def handle_connect(client, userdata, flags, rc) -> None:
"""
Handle an event when the MQTT connection is made. Subscribe to topic in this event so that
if the connection is lost and reconnects, the subscriptions get made again
"""
print("Connected to MQTT")
# Subscribe to all topics that begin with '/iot/cave/motion0/'
mqtt.subscribe('/iot/cave/motion0/*')
@mqtt.on_message()
def handle_mqtt_message(client, userdata, message) -> None:
"""
Handle an event where a message is published to one of the topics we are subscribed to.
Since we're (only) subscribed to the motion events, this means that motion has been registered somewhere.
:param message: An object containing information (including the topic and the payload) of the message
"""
# The last part of the topic is the sensor ID, like /iot/cave/motion0/123456
id = message.topic.split('/')[-1]
add_activity(id)
print("Logged activity on sensor {}".format(id))
@mqtt.on_log()
def handle_logging(client, userdata, level, buf) -> None:
"""
Handle an event where the MQTT library wants to log a message. Ignore any DEBUG-level messages
:param level: The level/severity of the message
:param buf: Message contents
"""
#if LOG_LEVELS[level] != 'DEBUG':
print(f"{LOG_LEVELS[level]}: {buf}")
def get_sensors():
"""
Get the list of known sensors from the database
:return: list of sensors
"""
cursor.execute("SELECT DISTINCT sensor FROM sensor_events;")
return [row[0] for row in cursor.fetchall()]
def get_history(sensor):
"""
Get last $HISTORY_LENGTH values from the given sensor
:param sensor: id of the sensor to query
:return: list of timestamps
"""
cursor.execute("SELECT date FROM sensor_events "
"WHERE sensor = '%s' "
"ORDER BY date DESC LIMIT %s;", (sensor, HISTORY_LENGTH))
return [row[0].replace(tzinfo=datetime.timezone.utc).isoformat() for row in cursor.fetchall()]
def get_history_ranges():
query = """
WITH differences AS (
SELECT
sensor,
alias,
date,
LAG(date, 1) OVER (
PARTITION BY sensor
ORDER BY date ASC) previous_date,
LAG(date, 1) OVER (
PARTITION BY sensor
ORDER BY date DESC) next_date
FROM sensor_events JOIN sensor_aliases ON sensor_events.sensor = sensor_aliases.id
WHERE date BETWEEN NOW() - INTERVAL '48h' AND NOW())
(
SELECT
sensor,
alias,
date,
'START' as type
FROM differences
WHERE date - previous_date > INTERVAL '1h' OR previous_date IS NULL
) UNION (
SELECT
sensor,
alias,
date,
'END' as type
FROM differences
WHERE next_date - date > INTERVAL '1h' OR next_date IS NULL
)
ORDER BY date ASC, type DESC;
"""
cursor.execute(query)
results = cursor.fetchall()
return results
def get_alias(sensor):
"""
Get the alias of a sensor, if known
:param sensor: id of the sensor
:return: sensor alias
"""
cursor.execute("SELECT alias FROM sensor_aliases "
"WHERE id = '%s';", (sensor,))
result = cursor.fetchone()
return result[0] if result else None
@app.route("/")
def view_activity():
"""
A Flask route that responds to requests on the URL '/'. Builds an JSON object from the stored data.
"""
response = []
for sensor in get_sensors():
history = get_history(sensor)
alias = get_alias(sensor)
response.append({
"id": sensor,
"alias": alias,
"history": history,
"latest": history[0],
})
if 'pretty' in request.args.keys():
body = json.dumps(response, sort_keys=True, indent=4, separators=(',', ': '))
else:
body = json.dumps(response)
return Response(body, mimetype='application/json')
@app.route("/history/v1")
@app.route("/history")
def view_history():
sensors = {}
for sensor, alias, date, event in get_history_ranges():
if sensor not in sensors.keys():
sensors[sensor] = {'id': sensor, 'alias': alias, 'events': []}
date = date.replace(tzinfo=datetime.timezone.utc).isoformat()
sensors[sensor]['events'].append(
{'date': date, 'event': event}
)
response = list(sensors.values())
if 'pretty' in request.args.keys():
body = json.dumps(response, sort_keys=True, indent=4, separators=(',', ': '))
else:
body = json.dumps(response)
return Response(body, mimetype='application/json')
if __name__ == '__main__':
# Finally start the app
app.run(host='0.0.0.0')