diff --git a/basil/HL/bronkhorst_elflow.py b/basil/HL/bronkhorst_elflow.py index d0439c8f..7616ea79 100644 --- a/basil/HL/bronkhorst_elflow.py +++ b/basil/HL/bronkhorst_elflow.py @@ -7,6 +7,7 @@ import logging import struct +import time from basil.HL.RegisterHardwareLayer import HardwareLayer @@ -16,160 +17,94 @@ class Bronkhorst_ELFLOW(HardwareLayer): ''' Bronkhorst ELFLOW + Manual can be found here: + https://www.bronkhorst.com/getmedia/77a1438f-e547-4a79-95ad-53e81fd38a97/917027-Manual-RS232-interface.pdf ''' + CMDS = { + 'get_measure_flow': ':06800401210120', + 'get_capacity': ':068004014D014D', + 'get_control_mode': ':06800401040104', + 'set_control_mode': ':0580010104', + 'set_setpoint': ':0680010121', + 'get_setpoint': ':06800401210121', + } + def __init__(self, intf, conf): self.debug = 0 self.node = "80" super(Bronkhorst_ELFLOW, self).__init__(intf, conf) + self.pre_time = time.time() def init(self): super(Bronkhorst_ELFLOW, self).init() def write(self, cmd): - cmd_s = "" - for c in cmd: - cmd_s = cmd_s + "%02X" % c - cmd_s = ":%02X%s%s" % (len(cmd) + 1, self.node, cmd_s) - if self.debug != 0: - logger.debug("ELFLOW.write() %s" % str(cmd_s)) - self._intf.write(cmd_s) + if time.time() - self.pre_time < 1.0: + time.sleep(1.0) + self._intf.write(str(cmd)) + self.pre_time = time.time() def read(self): - ret_s = self._intf.read() - if self.debug != 0: - logger.debug("ELFLOW.read() %s" % str(ret_s)) - if len(ret_s) < 5 or ret_s[0] != ":" or ret_s[3:5] != self.node: - logger.debug("ELFLOW.read() format error ret=%s" % str(ret_s)) - return [] - ret_len = int(ret_s[1:3]) - if ret_len * 2 != len(ret_s[3:-2]): - logger.debug("ELFLOW.read() data lenth error ret=%s" % str(ret_s)) - return [] - ret = [] - for i in range(ret_len - 1): - ret.append(int(ret_s[5 + 2 * i:5 + 2 * (i + 1)], 16)) - return ret + ret = self._intf.read() + if len(ret) < 2 or ret[-2:] != "\r\n": + logger.warning("read() termination error") + return ret.strip() def set_setpoint(self, value): - cmd = [1, 1, 0x21, (value >> 8) & 0xFF, value & 0xFF] - self.write(cmd) + """value range from 0 - 32000 + """ + + if not isinstance(value, int): + raise ValueError(f"Given value has to be of type integer, is {type(value)}!") + + hex_val = hex(value)[2:] # [2:] to remove the 0x from the beginning of the hex number + command = f"{self.CMDS['set_setpoint']}" + f"{hex_val.zfill(2)}" # hex should have at least two digits + self._intf.write(command) ret = self.read() - if len(ret) != 3: - logger.debug("ELFLOW.set_setpoint() data lenth error ret=%s" % str(ret)) - return -1 - elif ret[0] == 0 and ret[1] == 0 and ret[2] == 5: - return 0 - else: - logger.debug("ELFLOW.set_setpoint() ret error ret=%s" % str(ret)) - return -1 + return ret def get_setpoint(self): - cmd = [4, 1, 0x21, 1, 0x21] - self.write(cmd) + self._intf.write(self.CMDS['get_setpoint']) ret = self.read() - if len(ret) != 5: - logger.debug("ELFLOW.set_setpoint() data lenth error ret=%s" % str(ret)) - return -1 - elif ret[0] == 2 and ret[1] == cmd[1] and ret[2] == cmd[2]: - return ((ret[3] << 8) & 0xFF00) | (ret[4] & 0xFF) - else: - logger.debug("ELFLOW.set_setpoint() ret error ret=%s" % str(ret)) - return -1 - - def set_control_mode(self, value): + answer_in_hex = ret[11:] # read from the 11th digits to translate what point is set + answer = int(answer_in_hex, 16) + return answer + + def set_mode(self, value): """ 0 setpoint source RS232 3 valve close - 4 freeze valuve out + 4 freeze valve out 8 valve fully open - 20 valve steering (valve=setpoint)""" - cmd = [1, 1, 4, value & 0xFF] - self.write(cmd) - ret = self.read() - if len(ret) != 3: - logger.debug("ELFLOW.set_setpoint() data lenth error ret=%s" % str(ret)) - return -1 - elif ret[0] == 0 and ret[1] == 0 and ret[2] == 4: - return 0 - else: - logger.debug("ELFLOW.set_setpoint() ret error ret=%s" % str(ret)) - return -1 - - def get_control_mode(self): - cmd = [4, 1, 1, 1, 4] - self.write(cmd) - ret = self.read() - if len(ret) != 4: - logger.debug("ELFLOW.set_setpoint() data lenth error ret=%s" % str(ret)) - return -1 - elif ret[0] == 2 and ret[1] == cmd[1] and ret[2] == cmd[2]: - return ret[3] - else: - logger.debug("ELFLOW.set_setpoint() ret error ret=%s" % str(ret)) - return -1 - - def set_valve_output(self, value): - cmd = [1, 114, 0x41, (value >> 24) & 0xFF, (value >> 16) & 0xFF, (value >> 8) & 0xFF, value & 0xFF] - self.write(cmd) + 20 valve steering """ + hex_val = hex(value)[2:] # [2:] to remove the 0x from the beginning of the hex number + command = f"{self.CMDS['set_control_mode']}" + f"{hex_val.zfill(2)}" # hex should have at least two digits + self._intf.write(command) ret = self.read() - if len(ret) != 3: - logger.debug("ELFLOW.set_valve_output() data lenth error ret=%s" % str(ret)) - return -1 - elif ret[0] == 0 and ret[1] == 0 and ret[2] == 7: - return 0 - else: - logger.debug("ELFLOW.set_valve_output() ret error ret=%s" % str(ret)) - return -1 - - def get_valve_output(self): - cmd = [4, 114, 0x41, 114, 0x41] - self.write(cmd) - ret = self.read() - if len(ret) != 7: - logger.debug("ELFLOW.set_setpoint() data lenth error ret=%s" % str(ret)) - return -1 - elif ret[0] == 2 and ret[1] == cmd[1] and ret[2] == cmd[2]: - return ((ret[3] << 24) & 0xFF000000) | ((ret[4] << 16) & 0xFF0000) | ((ret[5] << 8) & 0xFF00) | (ret[6] & 0xFF) - else: - logger.debug("ELFLOW.get_valve_output() ret error ret=%s" % str(ret)) - return -1 - - def set_controller_speed(self, value): - value = struct.unpack('> 24) & 0xFF, (value >> 16) & 0xFF, (value >> 8) & 0xFF, value & 0xFF] - self.write(cmd) - ret = self.read() - if len(ret) != 3: - logger.debug("ELFLOW.set_controller_speed() data lenth error ret=%s" % str(ret)) - return -1 - elif ret[0] == 0 and ret[1] == 0 and ret[2] == 7: - return 0 - else: - logger.debug("ELFLOW.set_controller_speed() ret error ret=%s" % str(ret)) - return -1 - - def get_controller_speed(self): - cmd = [4, 114, 0x41, 114, 0x40 + 30] - self.write(cmd) + return ret + + def get_mode(self): + self._intf.write(self.CMDS['get_control_mode']) ret = self.read() - if len(ret) != 7: - logger.debug("ELFLOW.set_setpoint() data lenth error ret=%s" % str(ret)) - return -1 - elif ret[0] == 2 and ret[1] == cmd[1] and ret[2] == cmd[2]: - return struct.unpack('!f', chr(ret[3]) + chr(ret[4]) + chr(ret[5]) + chr(ret[6]))[0] - else: - logger.debug("ELFLOW.get_valve_output() ret error ret=%s" % str(ret)) - return -1 - - def get_measure(self): - cmd = [4, 1, 0x21, 1, 0x20] - self.write(cmd) + answer_in_hex = ret[11:] # read from the 11th digits to translate what mode is on + answer = int(answer_in_hex, 16) + return answer + + def get_flow(self): + """This should give the flow in l/min + """ + + # first get the max capacity in % + self._intf.write(self.CMDS['get_capacity']) ret = self.read() - if len(ret) != 5: - logger.debug("ELFLOW.set_setpoint() data lenth error ret=%s" % str(ret)) - return -1 - elif ret[0] == 2 and ret[1] == cmd[1] and ret[2] == cmd[2]: - return ((ret[3] << 8) & 0xFF00) | (ret[4] & 0xFF) - else: - logger.debug("ELFLOW.get_valve_output() ret error ret=%s" % str(ret)) - return -1 + answer_in_hex = ret[11:] # read from the 11th digits to translate what the capacity is + cap_100 = struct.unpack('!f', bytes.fromhex(answer_in_hex))[0] + + # now measure the flow + self._intf.write(self.CMDS['get_measure_flow']) + ret1 = self.read() + answer_in_hex = ret1[11:] + answer = int(answer_in_hex, 16) + + val = answer / 32000 * cap_100 + return val diff --git a/basil/HL/julaboFP50.py b/basil/HL/julaboFP50.py new file mode 100644 index 00000000..c8d152aa --- /dev/null +++ b/basil/HL/julaboFP50.py @@ -0,0 +1,111 @@ +# +# ------------------------------------------------------------ +# Copyright (c) All rights reserved +# SiLab, Institute of Physics, University of Bonn +# ------------------------------------------------------------ +# + +""" +This script is used to communicate with the chiller julabo fp50 +""" + +import logging +import time + +from basil.HL.HardwareLayer import HardwareLayer + + +logger = logging.getLogger(__name__) + + +class julaboFP50(HardwareLayer): + ''' Driver for the Julabo FP50 chiller. + A simple protocol via crossed null modem serial port is used with baud rate of 9600. + All commands were taken from JulaboFP50 manual. + ''' + + CMDS = {'get_temp': 'in_sp_00', + 'set_temp': 'out_sp_00', + 'get_curr_temp': 'in_pv_00', + 'get_version': 'version', + 'get_status': 'status', + 'start': 'out_mode_05 1', + 'stop': 'out_mode_05 0', + 'set_power': 'out_sp_06' + } + + def __init__(self, intf, conf): + super(julaboFP50, self).__init__(intf, conf) + self.pre_time = time.time() + + def init(self): + super(julaboFP50, self).init() + + def read(self): + ret = self._intf.read() + if len(ret) < 2 or ret[-2:] != "\r\n": + logger.warning("read() termination error") + return ret[:-2] + + def write(self, cmd): + if time.time() - self.pre_time < 1.0: + time.sleep(1.0) + self._intf.write(str(cmd)) + self.pre_time = time.time() + + def get_version(self): + ''' Read identifier + ''' + self.write(self.CMDS['get_version']) + ret = self.read() + return ret + + def start_chiller(self): + ''' Start chiller + ''' + self.write(self.CMDS['start']) + + def stop_chiller(self): + ''' Stop chiller + ''' + self.write(self.CMDS['stop']) + + def get_status(self): + ''' Get status + ''' + self.write(self.CMDS['get_status']) + ret = self.read() + logger.debug("status:{:s}".format(ret)) + try: + tmp = ret.split(" ", 1) + status = int(tmp[0]) + status_str = tmp[1:] + except (ValueError, AttributeError): + logger.warning("get_status() wrong format: {}".format(repr(ret))) + status = -99 + status_str = ret + return status, status_str + + def get_set_temp(self): + '''get the set temperature + ''' + self.write(self.CMDS['get_temp']) + ret = self.read() + return float(ret) + + def set_temp(self, temp): + '''set the temperature + ''' + self.write(f"{self.CMDS['set_temp']}={temp}") + + def get_temp(self): + '''get the current temperature in chiller + ''' + self.write(self.CMDS['get_curr_temp']) + ret = self.read() + return float(ret) + + def set_power(self, variable): + '''Set the power for heater/cooler via serial interface (positive value for heating, negative value for cooling) + ''' + self.write(f"{self.CMDS['set_power']}={variable}") diff --git a/examples/lab_devices/arduino_ntc_readout.yaml b/examples/lab_devices/arduino_ntc_readout.yaml index e86ce1ba..c51924a9 100644 --- a/examples/lab_devices/arduino_ntc_readout.yaml +++ b/examples/lab_devices/arduino_ntc_readout.yaml @@ -2,7 +2,7 @@ transfer_layer: - name : Serial type : Serial init : - port : /dev/ttyUSB1 + port : /dev/ttyUSB2 baudrate : 115200 timeout: 2 read_termination: "\r\n" # Needs to be double-quoted string for YAML to parse this correctly diff --git a/examples/lab_devices/julaboFP50.py b/examples/lab_devices/julaboFP50.py new file mode 100644 index 00000000..29034468 --- /dev/null +++ b/examples/lab_devices/julaboFP50.py @@ -0,0 +1,15 @@ +from basil.dut import Dut + +dut = Dut('julaboFP50_pyserial.yaml') +dut.init() + + +# turn on: +# dut["chiller"].start_chiller() + +# dut["chiller"].set_temp(15) # set temp + +print("Status: {}".format(dut["chiller"].get_status())) + +# turn off: +# dut["chiller"].stop_chiller() diff --git a/examples/lab_devices/julaboFP50_pyserial.yaml b/examples/lab_devices/julaboFP50_pyserial.yaml new file mode 100644 index 00000000..d565ea60 --- /dev/null +++ b/examples/lab_devices/julaboFP50_pyserial.yaml @@ -0,0 +1,21 @@ +transfer_layer: + - name : Serial + type : Serial + init : + port : /dev/ttyUSB0 + read_termination : "\r\n" + write_termination : "\r\n" + baudrate : 9600 + timeout : 5.0 + parity : "N" ### serial.PARITY_NONE + xonxoff : True # software handshake on + rtscts : False + dsrdtr : False + + +hw_drivers: + - name : chiller + type : julaboFP50 + interface : Serial + init: + device: julabo FP50