-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathservo.py
57 lines (49 loc) · 1.47 KB
/
servo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import aioactioncable
import asyncio
import json
import ssl
state = {
"dial_6": 0,
"dial_7": 0,
"toggle_6": 0
}
experiment_id = '6'
steppers = ["dial_6", "dial_7"]
power = 0
def update():
for stepper in steppers:
newmotorPos = state[stepper]
if newmotorPos != stepperPositions[stepper]:
print("stepperID: {}, stepperPos: {}".format(stepper, newmotorPos))
stepperPositions[stepper] = newmotorPos
if stepper == "dial_6":
stepper_id = 1
else:
stepper_id = 2
inputString = "M{0}:{1}\n".format(stepper_id, newmotorPos)
print(inputString)
ser.write(inputString.encode('utf-8'))
time.sleep(1)
if power != state["toggle_6"]:
if state["toggle_6"] == 1:
GPIO.output(power, GPIO.HIGH)
power = state["toggle_6"]
else:
time.sleep(5)
GPIO.output(power, GPIO.LOW)
power = state["toggle_6"]
print("status:", state["toggle_6"])
def process(msg):
msg_json = json.loads(msg)
print(f'Processing {msg_json}')
state[msg['control']] = msg['value']
update()
async def ac_recv(uri, identifier):
async with aioactioncable.connect(uri) as acconnect:
subscription = await acconnect.subscribe(identifier)
async for msg in subscription:
if json.loads(msg)['location'] == 'pi':
process(msg)
await subscription.send({**json.loads(msg), 'location': 'controls'})
update()
asyncio.run(ac_recv('ws://127.0.0.1:3000/cable', {'channel': 'ExperimentChannel', 'experiment': experiment_id, "location": 'pi'}))