From 3c3f11c3907c67f7d3967dbe7cb671cdea78cdc6 Mon Sep 17 00:00:00 2001 From: MariaPoliti Date: Mon, 20 Nov 2023 16:42:10 -0800 Subject: [PATCH] Added check for Active tool and included decorator to enforce check for tool actions. Added is_active_tool attribute to Tool base class --- science_jubilee/Machine.py | 9 +++++---- science_jubilee/tools/Camera.py | 4 +++- science_jubilee/tools/Pipette.py | 31 ++++++++++++++++++++---------- science_jubilee/tools/Syringe.py | 17 ++++++++++------ science_jubilee/tools/Tool.py | 15 ++++++++++++++- science_jubilee/tools/WebCamera.py | 5 +++-- 6 files changed, 57 insertions(+), 24 deletions(-) diff --git a/science_jubilee/Machine.py b/science_jubilee/Machine.py index 1fa8826..39a1f31 100644 --- a/science_jubilee/Machine.py +++ b/science_jubilee/Machine.py @@ -718,12 +718,10 @@ def reload_tool(self, tool: Tool = None): #TODO: Unload tool method @requires_safe_z - def pickup_tool(self, tool_id: Union[int, str, Tool] = None): + def pickup_tool(self, tool_id: Union[int, str, Tool]): """Pick up the tool specified by tool id.""" #TODO: Make sure axis limits are checked and not exceeded when picking up pipette - if isinstance( - tool_id, int - ): # Accept either tool index, tool name, or reference to the tool itself + if isinstance( tool_id, int): # Accept either tool index, tool name, or reference to the tool itself if tool_id in self.tools: tool_index = tool_id else: @@ -756,6 +754,7 @@ def pickup_tool(self, tool_id: Union[int, str, Tool] = None): # self.safe_z_movement() self.gcode(f"T{tool_index}") self.active_tool_index = tool_index + self.tools[tool_index]['tool'].is_active_tool = True @requires_safe_z def park_tool(self): @@ -763,6 +762,8 @@ def park_tool(self): self.safe_z_movement() self.gcode("T-1") # Update the cached value to prevent read delays. + current_tool_index= self.active_tool_index + self.tools[current_tool_index]['tool'].is_active_tool = False self._active_tool_index = -1 diff --git a/science_jubilee/tools/Camera.py b/science_jubilee/tools/Camera.py index 6cc5db9..1c809a4 100644 --- a/science_jubilee/tools/Camera.py +++ b/science_jubilee/tools/Camera.py @@ -1,4 +1,4 @@ -from .Tool import Tool, ToolStateError +from .Tool import Tool, ToolStateError, requires_active_tool import cv2 import matplotlib @@ -57,6 +57,7 @@ def get_camera_indices(self): i -= 1 return arr + @requires_active_tool def get_frame(self, resolution=[1200, 1200], uvc=False): with picamera.PiCamera() as camera: camera.resolution = (1200, 1200) @@ -87,6 +88,7 @@ def show_frame(self, frame, grid=False, save=False): def get_show_frame(self): self.show_frame(self.get_frame()) + @requires_active_tool def video_stream(self, camera_index=0): cap = cv2.VideoCapture( camera_index diff --git a/science_jubilee/tools/Pipette.py b/science_jubilee/tools/Pipette.py index c3cbba1..278363a 100644 --- a/science_jubilee/tools/Pipette.py +++ b/science_jubilee/tools/Pipette.py @@ -3,7 +3,7 @@ import os from labware.Labware import Labware, Well, Location -from .Tool import Tool, ToolStateError, ToolConfigurationError +from .Tool import Tool, ToolStateError, ToolConfigurationError, requires_active_tool from typing import Tuple, Union @@ -14,13 +14,11 @@ def tip_check(func): """ def wrapper(self, *args, **kwargs): if self.has_tip == False: - raise ToolStateError ("Error: tip needs to be attached before aspirating liquid") + raise ToolStateError ("Error: No tip is attached. Cannot complete this action") else: func(self,*args, **kwargs) return wrapper - - class Pipette(Tool): """ A class representation of an Opentrons OT2 pipette. @@ -100,7 +98,6 @@ def __init__(self, machine, index, name, brand, model, max_volume, self._machine = machine self.has_tip = False # TODO: add a way to change this to True/False and check before performing action with tool - self.is_active_tool = False self.first_available_tip = None self.tool_offset = self._machine.tool_z_offsets[self.index] self.is_primed = False @@ -150,7 +147,7 @@ def _getxyz(location: Union[Well, Tuple, Location]): raise ValueError("Location should be of type Well or Tuple") return x,y,z - + def vol2move(self, vol): """Converts desired volume in uL to a movement of the pipette motor axis @@ -163,6 +160,7 @@ def vol2move(self, vol): return dv + @requires_active_tool def prime(self, s=2500): """Moves the plunger to the low-point on the pipette motor axis to prepare for further commands Note::This position should not engage the pipette tip plunger @@ -173,6 +171,7 @@ def prime(self, s=2500): self._machine.move_to(v=self.zero_position, s = s, wait=True) self.is_primed = True + @requires_active_tool def _aspirate(self, vol: float, s:int = 2000): """Moves the plunger upwards to aspirate liquid into the pipette tip @@ -191,7 +190,8 @@ def _aspirate(self, vol: float, s:int = 2000): end_pos = float(pos['V']) + dv self._machine.move_to(v=end_pos, s=s ) - + + @requires_active_tool @tip_check def aspirate(self, vol: float, location : Union[Well, Tuple, Location], s:int = 2000): """Moves the pipette to the specified location and aspirates the desired volume of liquid @@ -218,6 +218,7 @@ def aspirate(self, vol: float, location : Union[Well, Tuple, Location], s:int = self._machine.move_to(z=z) self._aspirate(vol, s=s) + @requires_active_tool @tip_check def _dispense(self,vol: float, s:int = 2000): """Moves the plunger downwards to dispense liquid out of the pipette tip @@ -240,6 +241,7 @@ def _dispense(self,vol: float, s:int = 2000): # raise ToolStateError ("Error : The volume to be dispensed is greater than what was aspirated") self._machine.move_to(v = end_pos, s=s ) + @requires_active_tool @tip_check def dispense(self, vol: float, location :Union[Well, Tuple, Location], s:int = 2000): """Moves the pipette to the specified location and dispenses the desired volume of liquid @@ -271,6 +273,7 @@ def dispense(self, vol: float, location :Union[Well, Tuple, Location], s:int = 2 self._machine.move_to(z=z) self._dispense(vol, s=s) + @requires_active_tool @tip_check def transfer(self, vol: float, source_well: Union[Well, Tuple, Location], destination_well :Union[Well, Tuple, Location] , s:int = 3000, @@ -353,7 +356,8 @@ def transfer(self, vol: float, source_well: Union[Well, Tuple, Location], # if new_tip == 'always': #TODO: need to add new_tip option! - + + @requires_active_tool @tip_check def blowout(self, s : int = 3000): """Blows out any remaining liquid in the pipette tip @@ -367,6 +371,7 @@ def blowout(self, s : int = 3000): self._machine.move_to(v = self.blowout_position, s=s) self.prime() + @requires_active_tool @tip_check def air_gap(self, vol): """Moves the plunger upwards to aspirate air into the pipette tip @@ -381,6 +386,7 @@ def air_gap(self, vol): self._machine.move_to(z = well.top_ + 20) self._machine.move(v= -1*dv) + @requires_active_tool @tip_check def mix(self, vol: float, n: int, s: int = 5000): """Mixes liquid by alternating aspirate and dispense steps for the specified number of times @@ -406,6 +412,7 @@ def mix(self, vol: float, n: int, s: int = 5000): self.prime(s=s) ## In progress (2023-10-12) To test + @requires_active_tool @tip_check def stir(self, n_times: int = 1, height: float= None): """Stirs the liquid in the current well by moving the pipette tip in a circular motion @@ -484,6 +491,7 @@ def add_tiprack(self, tiprack: Union[Labware, list]): self.first_available_tip = self.tipiterator.next() + @requires_active_tool def _pickup_tip(self, z): """Moves the Jubilee Z-axis upwards to pick up a pipette tip and stops once the tip sensor is triggered @@ -496,7 +504,8 @@ def _pickup_tip(self, z): else: raise ToolStateError("Error: Pipette already equipped with a tip.") #TODO: Should this be an error or a warning? - + + @requires_active_tool def pickup_tip(self, tip_ : Union[Well, Tuple] = None): """Moves the pipette to the specified location and picks up a tip @@ -524,6 +533,7 @@ def pickup_tip(self, tip_ : Union[Well, Tuple] = None): #TODO: This should probably iterate the next available tip so that if you use a tip then replace it, you have to manually specify to go use that tip again rather than it just getting picked up. + @requires_active_tool def return_tip(self, location: Well = None): """Returns the pipette tip to the either the specified location or to where the tip was picked up from @@ -544,7 +554,7 @@ def return_tip(self, location: Well = None): self.has_tip = False self.update_z_offset(tip=False) - + @requires_active_tool def _drop_tip(self): """Moves the plunger to eject the pipette tip @@ -560,6 +570,7 @@ def increment_tip(self): """ self.first_available_tip = self.tipiterator.next() + @requires_active_tool @tip_check def drop_tip(self, location: Union[Well, Tuple]): """Moves the pipette to the specified location and drops the pipette tip diff --git a/science_jubilee/tools/Syringe.py b/science_jubilee/tools/Syringe.py index c3809e3..fa8d02b 100644 --- a/science_jubilee/tools/Syringe.py +++ b/science_jubilee/tools/Syringe.py @@ -1,4 +1,4 @@ -from .Tool import Tool, ToolStateError +from .Tool import Tool, ToolStateError, ToolConfigurationError, requires_active_tool from science_jubilee.labware.Labware import Labware, Well from typing import Tuple, Union import warnings @@ -55,7 +55,8 @@ def check_bounds(self, pos): """Disallow commands outside of the syringe's configured range""" if pos > self.max_range or pos < self.min_range: raise ToolStateError(f"Error: {pos} is out of bounds for the syringe!") - + + @requires_active_tool def _aspirate(self, vol: float, s: int = 2000): """Aspirate a certain number of milliliters.""" de = vol * -1 * self.mm_to_ml @@ -63,7 +64,8 @@ def _aspirate(self, vol: float, s: int = 2000): end_pos = float(pos[self.e_drive]) + de self.check_bounds(end_pos) self._machine.move(de=de, wait = True) - + + @requires_active_tool def _dispense(self, vol, s: int = 2000): """Dispense a certain number of milliliters.""" de = vol * self.mm_to_ml @@ -71,7 +73,8 @@ def _dispense(self, vol, s: int = 2000): end_pos = float(pos[self.e_drive]) + de self.check_bounds(end_pos) self._machine.move(de=de, wait = True) - + + @requires_active_tool def aspirate( self, vol: float, @@ -95,7 +98,8 @@ def aspirate( self._machine.move_to(x=x, y=y) self._machine.move_to(z=z) self._aspirate(vol, s=s) - + + @requires_active_tool def dispense( self, vol: float, @@ -121,7 +125,8 @@ def dispense( self._machine.move_to(x=x, y=y) self._machine.move_to(z=z) self._dispense(vol, s=s) - + + @requires_active_tool def transfer( self, vol: float, diff --git a/science_jubilee/tools/Tool.py b/science_jubilee/tools/Tool.py index e586a02..0314d35 100644 --- a/science_jubilee/tools/Tool.py +++ b/science_jubilee/tools/Tool.py @@ -16,6 +16,8 @@ def __init__(self, index, name, **kwargs): raise ToolConfigurationError("Incorrect usage: load_tool(, , **kwargs)") self.index = index self.name = name + self.is_active_tool = False + for k,v in kwargs.items(): setattr(self, k, v ) @@ -27,4 +29,15 @@ def post_load(self): #add a park tool method that every tool config can define to do things that need to be done pre or post parking #ex: make sure pipette has dropped tips before parking - \ No newline at end of file + +def requires_active_tool(func): + """Decorator to ensure that a tool cannot complete an action unless it is the + current active tool. + """ + # print('check') + def wrapper(self, *args, **kwargs): + if self.is_active_tool == False: + raise ToolStateError (f"Error: Tool {self.name} is not the current `Active Tool`. Cannot perform this action") + else: + func(self,*args, **kwargs) + return wrapper \ No newline at end of file diff --git a/science_jubilee/tools/WebCamera.py b/science_jubilee/tools/WebCamera.py index d9c4987..3587bf8 100644 --- a/science_jubilee/tools/WebCamera.py +++ b/science_jubilee/tools/WebCamera.py @@ -10,7 +10,7 @@ from labware.Labware import Well, Location from typing import Tuple, Union -from .Tool import Tool +from .Tool import Tool, requires_active_tool class Camera(Tool): @@ -50,7 +50,7 @@ def _getxyz(location: Union[Well, Tuple, Location]): return x,y,z - + @requires_active_tool def _capture_image(self, timeout = 10): """ Capture image from raspberry pi and write to file @@ -64,6 +64,7 @@ def _capture_image(self, timeout = 10): return response.content + @requires_active_tool def capture_image(self, location: Union[Well, Tuple]): x, y, z = self._getxyz(location)