-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy paththingspeak
executable file
·57 lines (52 loc) · 1.93 KB
/
thingspeak
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#!/usr/bin/env python
# To run: set environment variable REDIS_URL to the input instance,
# and pass the ThingSpeak channel ID and API write key as the CLI arguments:
# $ REDIS_URL="redis://input_host" ./thingspeak TS_CHANNEL_ID TS_CHANNEL_API_WRITE_KEY
# set LOC to a (LAT, LNG) tuple if you want to include location information
import time
import json
import urllib
import urllib2
import sys
from datetime import datetime
from rpjios.SubscriberBase import PSubscriber
SEND_EVERY = 20 # samples
FIELDS = ['mc_1p0', 'mc_2p5', 'mc_4p0', 'mc_10p0', 'nc_0p5', 'nc_2p5', 'nc_10p0', 'typical_particle_size']
CHANNEL_ID = int(sys.argv[1])
API_WRITE_KEY = sys.argv[2]
LOC = None
WRITE_URL = "https://api.thingspeak.com/update.json?api_key={}".format(API_WRITE_KEY)
msg_c = 1
def msg_rx(msg):
global msg_c
if not msg_c % SEND_EVERY:
_d = json.loads(msg["data"])
(ts, val) = (_d["ts"], _d["value"])
_pl = {
"created_at": datetime.fromtimestamp(ts).isoformat(),
"channel_id": CHANNEL_ID
}
if LOC is not None:
_pl["latitude"] = LOC[0]
_pl["longitude"] = LOC[1]
for fi in range(0, len(FIELDS)):
_pl["field{}".format(fi+1)] = val[FIELDS[fi]]
_params = urllib.urlencode(_pl)
_url = "{}&{}".format(WRITE_URL, _params)
_resp = urllib2.urlopen(_url)
_rread = _resp.read()
_rc = _resp.getcode()
try:
if _rc != 200:
print "Request failure ({})! URL: {}".format(_rc, _url)
elif type(_rread) == str:
print "Success: entry #{} added".format(json.loads(_rread)["entry_id"])
else:
print "Unknown failure: {} {}".format(_rc, _rread)
except Exception as e:
print "Unknown failure: {} ({})".format(e, _rc)
msg_c = 1
msg_c += 1
p = PSubscriber("*SPS30*").add_listener(msg_rx)
while 1:
time.sleep(0.5)