-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcron.py
208 lines (159 loc) · 6.76 KB
/
cron.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
#! /usr/bin/env python
# coding: utf-8
"""
Script to water the plants. To be run periodically with cron.
WARNING: this script can be run on a Raspberry Pi only.
"""
import argparse
import json
import logging as log
import math
import os
import time
import RPi.GPIO as GPIO
from inout.ultrasonic import UltrasonicSensor
from inout.relay import Relay
# Define log file
CODE_DIR = os.path.dirname(os.path.abspath(__file__))
LOG_FILE = os.path.join(CODE_DIR, "cron.log")
log.basicConfig(filename=LOG_FILE, format='%(asctime)s [%(levelname).4s] %(message)s', level=log.INFO)
def parse_args():
"""Command line parser."""
parser = argparse.ArgumentParser()
# Raspberry
rpi = parser.add_argument_group("Raspberry.")
rpi.add_argument("--pinconfig", type=str, default=os.path.join(CODE_DIR, "config", "pins.json"),
help="Path to the pins configuration file (default: %(default)s).")
rpi.add_argument("--temperature", type=int, default=20,
help="Temperaturen in Celsius, to compute sound speed (default: %(default)d).")
rpi.add_argument("--valve_relay", type=int, choices=[1, 2, 3, 4, 5, 6, 7, 8], default=1,
help="Relay of the electrovalve (default: %(default)d).")
# Water container
container = parser.add_argument_group("Water container.")
container.add_argument("--height", type=float, default=3,
help="Height of the water container, in meters (default: %(default)d).")
container.add_argument("--diameter", type=float, default=1,
help="Diameter of the water container, in meters (default: %(default)d).")
# Plant watering
water = parser.add_argument_group("Plants watering.")
water.add_argument("--liters", type=float, default=5,
help="Number of liters to spread (default: %(default)d).")
# Security
security = parser.add_argument_group("Security limits.")
security.add_argument("--min_volume", type=float, default=350,
help="Minimum number of liters to allow to spread water (default: %(default)d).")
security.add_argument("--rain_volume", type=float, default=250,
help="Volume added to detect a rain fall (default: %(default)d).")
security.add_argument("--time_limit", type=float, default=2700,
help="Watering time limit, for security, in seconds (default: %(default)d).")
return parser.parse_args()
def load_pin_config(path):
"""Load the pin config file and check it."""
with open(path, "r") as f:
pins = json.load(f)
assert sorted(pins.keys()) == sorted(["relay", "trigger", "echo"])
return pins
def gpio(function):
"""Decorator.
- Set mode to GPIO.BCM
- Call GPIO.cleanup if an exception is raised (or if ctrl+c)
"""
def gpio_function():
GPIO.setmode(GPIO.BCM)
try:
function()
except KeyboardInterrupt:
log.warning("Interruption from user.")
except Exception as e:
log.critical(e)
finally:
log.info("Cleaning GPIO.")
GPIO.cleanup()
return gpio_function
def water_level(water_container, sensor_value):
"""Computes volume based on sensor mesure and geometry of the water container.
Args:
water_container: Dictionary containing the geometry of the container (height, diameter) in meters.
sensor_value: Measurement in meters.
Returns:
The remaining volume in the container in liters.
"""
volume = math.pi * (water_container["radius"] ** 2) * (water_container["height"] - sensor_value)
return volume * 1000 # liters
@gpio
def main():
"""Main function to water plants."""
# Log a new line for readability
log.info("\n\n### New call ###\n")
# Parse command line
args = parse_args()
# Load pin config file
pins = load_pin_config(args.pinconfig)
# Initialize ultrasonic sensor
sensor = UltrasonicSensor(trig=pins["trigger"],
echo=pins["echo"],
temperature=args.temperature)
# Initialize relays
relays = {}
for id, pin in pins["relay"].items():
if pin is None:
log.debug("Relay n°{} is not available (according to the pin config file)".format(id))
relays[int(id)] = None
continue
relays[int(id)] = Relay(pin)
# The valve is a specific relay
valve = relays[args.valve_relay]
# Water container geometry (height and diameter)
water_container = {
"height": args.height,
"radius": args.diameter / 2,
}
# Measure the intial water level
measure = sensor.median_measure(rep=101) # Do measure
volume = water_level(water_container, measure) # Convert it in a volume in liters
# Load the last volume logged
with open(LOG_FILE, "r") as f:
lines = f.readlines()
last_volume = None
for line in lines[::-1]:
if "[VOLUME]" in line:
last_volume = float(line.split("[VOLUME] ")[-1].split(" L")[0])
log.info("[WATERING] Last volume: {:.2f} L".format(last_volume))
break
else:
log.debug("[WATERING] No water volume measured yet.")
# Log the new volume
log.info("[VOLUME] {:.2f} L / {:.4f} cm (before watering)".format(volume, measure))
# If not enough water in the container, do nothing
if volume < args.min_volume:
log.warning("[SECURITY] Not enough water: {:.2f} L. No watering.".format(volume))
return
# If it rained, do nothing
if last_volume is not None and volume - last_volume > args.rain_volume:
log.warning("[SECURITY] It rained {:.2f} L. No watering.".format(volume - last_volume))
return
# Water the plants, with a time limit for security.
counter_stop = 0
start_time = time.time()
log.info("[WATERING] Starting.")
valve.close()
while counter_stop < 3:
time.sleep(10)
new_measure = sensor.median_measure(rep=21) # Do measure
new_volume = water_level(water_container, new_measure) # Convert it in a volume in liters
# Check that the desired volume is reached
if volume - new_volume > args.liters:
counter_stop += 1
else:
counter_stop = 0
# Time limit to prevent from accidentally emptying the container
if time.time() - start_time > args.time_limit:
log.warning("[SECURITY] Time limit reached, stopping watering.")
break
log.info("[VOLUME] {:.2f} L / {:.4f} cm (while watering).".format(new_volume, new_measure))
# Stop watering.
valve.open()
log.info("[WATERING] Stopping. {:.2f} L used. Watered during {} seconds.".format(volume - new_volume, time.time() - start_time))
log.info("[VOLUME] {:.2f} L / {:.4f} cm (after watering).".format(new_volume, new_measure))
if __name__ == "__main__":
main()