-
Notifications
You must be signed in to change notification settings - Fork 159
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add support for YF-S201 flow rate sensor
- Loading branch information
Showing
2 changed files
with
96 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
""" | ||
YF-S201 Flow Rate Sensor | ||
Example configuration: | ||
sensor_modules: | ||
- name: yfs201 | ||
module: yfs201 | ||
sensor_inputs: | ||
- name: flow_rate1 | ||
module: yfs201 | ||
pin: 0 | ||
digits: 0 | ||
interval: 10 | ||
""" | ||
|
||
from typing import Dict, cast | ||
from ...types import CerberusSchemaType, ConfigType, SensorValueType | ||
from . import GenericSensor | ||
|
||
REQUIREMENTS = ("gpiozero",) | ||
|
||
|
||
class YFS201: | ||
""" | ||
YF-S201 Flow Rate Sensor class | ||
Multiple instances support multiple sensors on different pins | ||
""" | ||
|
||
def __init__(self, gpiozero, name: str, pin: int) -> None: | ||
self.name = name | ||
self.pin = gpiozero.DigitalInputDevice(pin) | ||
self.pin.when_activated = self.count_pulse | ||
self.count = 0 | ||
|
||
def count_pulse(self) -> None: | ||
"""Increment pulse count.""" | ||
self.count += 1 | ||
|
||
def reset_count(self) -> None: | ||
"""Reset pulse count.""" | ||
self.count = 0 | ||
|
||
def flow_rate(self, sample_window: int) -> float: | ||
"""Return flow rate in liters per minute. | ||
From YF-S201 manual: | ||
Pluse Characteristic:F=7Q(L/MIN). | ||
2L/MIN=16HZ 4L/MIN=32.5HZ 6L/MIN=49.3HZ 8L/MIN=65.5HZ 10L/MIN=82HZ | ||
Pulse frequency (Hz) / 7.0 = flow rate in L/min | ||
sample_window is in seconds, so hz is pulse_count / sample_window | ||
""" | ||
hertz = self.count / sample_window | ||
return hertz / 7.0 | ||
|
||
def get_value(self, interval: int) -> float: | ||
"""Return flow rate in L/min over interval seconds and reset count.""" | ||
flow_rate = cast(float, self.flow_rate(interval)) | ||
self.reset_count() | ||
return flow_rate | ||
|
||
|
||
class Sensor(GenericSensor): | ||
""" | ||
YF-S201 Flow Rate Sensor | ||
""" | ||
|
||
SENSOR_SCHEMA: CerberusSchemaType = { | ||
"pin": dict( | ||
type="integer", | ||
required=True, | ||
empty=False, | ||
) | ||
} | ||
|
||
def setup_module(self) -> None: | ||
# pylint: disable=import-outside-toplevel,import-error | ||
import gpiozero # type: ignore | ||
|
||
self.gpiozero = gpiozero | ||
self.sensors: Dict[str, YFS201] = {} | ||
|
||
def setup_sensor(self, sens_conf: ConfigType) -> None: | ||
sensor = YFS201( | ||
gpiozero=self.gpiozero, name=sens_conf["name"], pin=sens_conf["pin"] | ||
) | ||
self.sensors[sensor.name] = sensor | ||
|
||
def get_value(self, sens_conf: ConfigType) -> SensorValueType: | ||
return self.sensors[sens_conf["name"]].get_value(sens_conf["interval"]) |