-
-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathdb.py
90 lines (71 loc) · 3.64 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import os
import pandas as pd
import numpy as np
from datetime import datetime, timezone
from cachetools import cached, TTLCache
import influxdb_client
INFLUXDB_BUCKET = "frog_fleet"
INFLUXDB_ORG = "Ribbit Network"
INFLUXDB_URL = "https://us-west-2-1.aws.cloud2.influxdata.com/"
INFLUXDB_TOKEN = os.environ['INFLUXDB_TOKEN']
client = influxdb_client.InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
query_api = client.query_api()
@cached(cache=TTLCache(maxsize=1, ttl=60))
def get_map_data() -> pd.DataFrame:
df = query_api.query_data_frame(f'from(bucket:"{INFLUXDB_BUCKET}")'
'|> range(start:-30d)'
'|> filter(fn: (r) => r._field == "co2" or r._field == "lat" or r._field == "lon")'
'|> filter(fn: (r) => r.lat != 0 and r.lon != 0)'
'|> last()'
'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'
'|> map(fn: (r) => ({r with co2: float(v: r.co2)}))'
'|> keep(columns: ["_time", "host", "lat", "lon", "co2"])')
return concat_result(df)
# `100` chosen arbitrarily, tweak with extreme prejudice
@cached(cache=TTLCache(maxsize=100, ttl=60))
def get_sensor_data(host: str, duration: str, frequency: str) -> pd.DataFrame:
df = query_api.query_data_frame(f'from(bucket:"{INFLUXDB_BUCKET}")'
f'|> range(start: -{duration})'
f'|> filter(fn: (r) => r.host == "{host}")'
'|> aggregateWindow(every: 1m, fn: mean, createEmpty: false)'
'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'
'|> keep(columns: ["co2", "temperature", "humidity", "lat", "lon", "_time", "baro_pressure"])')
if df.empty:
return df
df.drop(['result', 'table'], axis=1, inplace=True)
if frequency in ['5min', '10min', '15min', '30min', '1h']:
df.set_index('_time', inplace=True)
df = df.resample(frequency).mean().reset_index()
return df
def print_sensor_info():
influx_query = f'from(bucket:"{INFLUXDB_BUCKET}") \
|> range(start:-30d) \
|> filter(fn:(r) => r._field == "co2") \
|> last() \
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'
result = query_api.query_data_frame(org=INFLUXDB_ORG, query=influx_query)
result = concat_result(result)
result["days_since_last_read"] = ((datetime.now(timezone.utc) - result["_time"]).dt.total_seconds() / (60 * 60 * 24)).astype(int)
result["version"] = np.where(result['host'].str.endswith('_golioth_esp32s3'), 'v4', 'v3')
result["co2"] = result["co2"].astype(int)
result = result.sort_values(by=['days_since_last_read'], ascending=[True])[['host', 'co2', 'version', 'days_since_last_read']]
return result
def concat_result(df):
if type(df) is list:
# for x in df:
# print(x.dtypes)
# print(x[['_measurement', 'co2', 'host']])
return pd.concat(df)
if type(df) is pd.DataFrame:
return dfs
# python3 ./db.py
if __name__ == '__main__':
# https://stackoverflow.com/questions/52805115/certificate-verify-failed-unable-to-get-local-issuer-certificate
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)
#df = print_sensor_info()
#print(df)
#df = get_map_data()
#print(df)
df = get_sensor_data('63b9deb6b679f5d522bda82a_golioth_esp32s3', '30d', '5min')
print(df)