Skip to content

Commit

Permalink
File system based sensors implementation for voltage and current sens…
Browse files Browse the repository at this point in the history
…ors (#426)

* File system based sensors implementation for voltage and current sensors.

* Added error handling for missing data in yaml file

* Added test code.

* Addressed review comments

* Fixed missing inclusion

* Fixed UT error

* Fixed test case failures in other repos.

* Addressed review comments

* Fixed UT

* Review comment

* Removed empty line in diff.

* Removed changes in chassis base tests
  • Loading branch information
bmridul authored Mar 4, 2024
1 parent 3d35404 commit 56921d8
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 0 deletions.
143 changes: 143 additions & 0 deletions sonic_platform_base/sensor_fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
"""
sensor_fs.py
File system based sensors implementation.
"""

from sonic_platform_base.sensor_base import SensorBase
from sonic_platform_base.sensor_base import VoltageSensorBase
from sonic_platform_base.sensor_base import CurrentSensorBase
import logging


class SensorFs(SensorBase):
"""Implementation of file system based sensor class"""

__defaults = {
'name': '',
'sensor': '',
'high_thresholds': ['N/A', 'N/A', 'N/A'], #minor, major, critical
'low_thresholds': ['N/A', 'N/A', 'N/A'], #minor, major, critical
'position': -1,
}

def __init__(self, sensor_type='sensor', **kw):

super(SensorFs, self).__init__()

if 'sensor' not in kw:
raise Exception('Failed to initialize sensor')

for k,v in self.__defaults.items():
setattr(self, k, kw.get(k,v))

if (len(self.high_thresholds) != 3 or len(self.low_thresholds) != 3):
raise Exception('{}: Missing sensor thresholds'.format(self.name))

self.minimum_sensor = self.get_value()
self.maximum_sensor = self.minimum_sensor

def get_name(self):
"""Returns the sensor name"""
return self.name

def get_value(self):
"""Returns the sensor measurement"""
try:
with open(self.sensor) as f:
return int(f.readline().rstrip())
except:
return None

def get_high_threshold(self):
"""Returns the sensor high threshold value"""
return self.high_thresholds[1]

def get_low_threshold(self):
"""Returns the sensor low threshold value"""
return self.low_thresholds[1]

def set_high_threshold(self, value):
"""Sets the sensor high threshold value"""
self.high_thresholds[1] = value
return True

def set_low_threshold(self, value):
"""Sets the sensor low threshold value"""
self.low_thresholds[1] = value
return True

def get_high_critical_threshold(self):
"""Returns the sensor critical high threshold value"""
return self.high_thresholds[2]

def get_low_critical_threshold(self):
"""Returns the sensor critical low threshold value"""
return self.low_thresholds[2]

def set_high_critical_threshold(self, value):
"""Sets the sensor critical high threshold value"""
self.high_thresholds[2] = value
return True

def set_low_critical_threshold(self, value):
"""Sets the sensor critical low threshold value"""
self.low_thresholds[2] = value
return True

def get_minimum_recorded(self):
"""Retrieves the minimum recorded sensor measurement"""
tmp = self.get_value()
if tmp is None:
return None
if self.minimum_sensor is None or tmp < self.minimum_sensor:
self.minimum_sensor = tmp
return self.minimum_sensor

def get_maximum_recorded(self):
"""Retrieves the maximum recorded sensor measurement"""
tmp = self.get_value()
if tmp is None:
return None
if self.maximum_sensor is None or tmp > self.maximum_sensor:
self.maximum_sensor = tmp
return self.maximum_sensor

def get_position_in_parent(self):
"""Retrieves 1-based relative physical position in parent device"""
return self.position

@staticmethod
def factory(sensor_cls, sensors_data):
"""Factory method for retrieving a list of Sensor objects"""
logging.basicConfig()
logger = logging.getLogger()

result = []

for idx, sensor in enumerate(sensors_data):
sensor['position'] = idx + 1
try:
result.append(sensor_cls(**sensor))
except Exception as e:
logger.warning('Sensor.factory: {}'.format(e))

return result


class VoltageSensorFs(SensorFs, VoltageSensorBase):
"""File system based voltage sensor class"""

DEVICE_TYPE = "voltage_sensor"

def __init__(self, **kw):
super(VoltageSensorFs, self).__init__(self.DEVICE_TYPE, **kw)


class CurrentSensorFs(SensorFs, CurrentSensorBase):
"""File systems based Current sensor class"""

DEVICE_TYPE = "current_sensor"

def __init__(self, **kw):
super(CurrentSensorFs, self).__init__(self.DEVICE_TYPE, **kw)
1 change: 1 addition & 0 deletions tests/sensor_data/VSENSOR1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
900
83 changes: 83 additions & 0 deletions tests/sensor_fs_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
'''
Test Sensor_fs module
'''

import yaml
import os
from unittest import mock
from sonic_platform_base.sensor_fs import VoltageSensorFs
from sonic_platform_base.sensor_fs import CurrentSensorFs

yaml_data = """
voltage_sensors:
- name : VSENSOR1
sensor: 'sensor_data/VSENSOR1'
high_thresholds: [ 1000, 1050, 1080 ]
low_thresholds: [ 800, 850, 890 ]
- name : VSENSOR2
sensor: 'sensor_data/VSENSOR2'
high_thresholds: [ 800, 850, 870 ]
low_thresholds: [ 600, 620, 750 ]
current_sensors:
- name : CSENSOR1
sensor: 'sensor_data/CSENSOR1'
high_thresholds: [ 1000, 1050, 1080 ]
low_thresholds: [ 800, 850, 890 ]
- name : CSENSOR2
sensor: 'sensor_data/CSENSOR2'
high_thresholds: [ 800, 850, 870 ]
low_thresholds: [ 600, 620, 750 ]
"""

class TestSensorFs:
'''
Collection of SensorFs test methods
'''

@staticmethod
def test_sensor_fs():
'''
Test voltage sensors
'''
sensors_data = yaml.safe_load(yaml_data)

vsensors = VoltageSensorFs.factory(VoltageSensorFs, sensors_data['voltage_sensors'])
csensors = CurrentSensorFs.factory(CurrentSensorFs, sensors_data['current_sensors'])

assert(vsensors[0].get_name() == 'VSENSOR1')
assert(vsensors[0].get_position_in_parent() == 1)

vsensors[0].set_high_threshold(800)
assert(vsensors[0].get_high_threshold() == 800)
vsensors[0].set_high_critical_threshold(900)
assert(vsensors[0].get_high_critical_threshold() == 900)

vsensors[0].set_low_threshold(500)
assert(vsensors[0].get_low_threshold() == 500)
vsensors[0].set_low_critical_threshold(400)
assert(vsensors[0].get_low_critical_threshold() == 400)

assert(csensors[0].get_name() == 'CSENSOR1')
assert(csensors[0].get_position_in_parent() == 1)

csensors[0].set_high_threshold(800)
assert(csensors[0].get_high_threshold() == 800)
csensors[0].set_high_critical_threshold(900)
assert(csensors[0].get_high_critical_threshold() == 900)

csensors[0].set_low_threshold(500)
assert(csensors[0].get_low_threshold() == 500)
csensors[0].set_low_critical_threshold(400)
assert(csensors[0].get_low_critical_threshold() == 400)

assert(vsensors[0].get_minimum_recorded() == None)

tests_path = os.path.dirname(os.path.abspath(__file__))
vsensor_path = os.path.join(tests_path, "sensor_data/VSENSOR1")
vsensors[0].sensor = vsensor_path
print(vsensor_path, vsensors[0])

assert(vsensors[0].get_value() == 900)
assert(vsensors[0].get_minimum_recorded() == 900)
assert(vsensors[0].get_maximum_recorded() == 900)

0 comments on commit 56921d8

Please sign in to comment.