Skip to content

Commit

Permalink
Added check for Active tool and included decorator to enforce check f…
Browse files Browse the repository at this point in the history
…or tool actions. Added is_active_tool attribute to Tool base class
  • Loading branch information
MariaPoliti committed Nov 21, 2023
1 parent 3f3cdd8 commit 3c3f11c
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 24 deletions.
9 changes: 5 additions & 4 deletions science_jubilee/Machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,12 +718,10 @@ def reload_tool(self, tool: Tool = None):
#TODO: Unload tool method

@requires_safe_z
def pickup_tool(self, tool_id: Union[int, str, Tool] = None):
def pickup_tool(self, tool_id: Union[int, str, Tool]):
"""Pick up the tool specified by tool id."""
#TODO: Make sure axis limits are checked and not exceeded when picking up pipette
if isinstance(
tool_id, int
): # Accept either tool index, tool name, or reference to the tool itself
if isinstance( tool_id, int): # Accept either tool index, tool name, or reference to the tool itself
if tool_id in self.tools:
tool_index = tool_id
else:
Expand Down Expand Up @@ -756,13 +754,16 @@ def pickup_tool(self, tool_id: Union[int, str, Tool] = None):
# self.safe_z_movement()
self.gcode(f"T{tool_index}")
self.active_tool_index = tool_index
self.tools[tool_index]['tool'].is_active_tool = True

@requires_safe_z
def park_tool(self):
"""Park the current tool."""
self.safe_z_movement()
self.gcode("T-1")
# Update the cached value to prevent read delays.
current_tool_index= self.active_tool_index
self.tools[current_tool_index]['tool'].is_active_tool = False
self._active_tool_index = -1


Expand Down
4 changes: 3 additions & 1 deletion science_jubilee/tools/Camera.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .Tool import Tool, ToolStateError
from .Tool import Tool, ToolStateError, requires_active_tool

import cv2
import matplotlib
Expand Down Expand Up @@ -57,6 +57,7 @@ def get_camera_indices(self):
i -= 1
return arr

@requires_active_tool
def get_frame(self, resolution=[1200, 1200], uvc=False):
with picamera.PiCamera() as camera:
camera.resolution = (1200, 1200)
Expand Down Expand Up @@ -87,6 +88,7 @@ def show_frame(self, frame, grid=False, save=False):
def get_show_frame(self):
self.show_frame(self.get_frame())

@requires_active_tool
def video_stream(self, camera_index=0):
cap = cv2.VideoCapture(
camera_index
Expand Down
31 changes: 21 additions & 10 deletions science_jubilee/tools/Pipette.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os

from labware.Labware import Labware, Well, Location
from .Tool import Tool, ToolStateError, ToolConfigurationError
from .Tool import Tool, ToolStateError, ToolConfigurationError, requires_active_tool
from typing import Tuple, Union


Expand All @@ -14,13 +14,11 @@ def tip_check(func):
"""
def wrapper(self, *args, **kwargs):
if self.has_tip == False:
raise ToolStateError ("Error: tip needs to be attached before aspirating liquid")
raise ToolStateError ("Error: No tip is attached. Cannot complete this action")
else:
func(self,*args, **kwargs)
return wrapper



class Pipette(Tool):
""" A class representation of an Opentrons OT2 pipette.
Expand Down Expand Up @@ -100,7 +98,6 @@ def __init__(self, machine, index, name, brand, model, max_volume,
self._machine = machine
self.has_tip = False
# TODO: add a way to change this to True/False and check before performing action with tool
self.is_active_tool = False
self.first_available_tip = None
self.tool_offset = self._machine.tool_z_offsets[self.index]
self.is_primed = False
Expand Down Expand Up @@ -150,7 +147,7 @@ def _getxyz(location: Union[Well, Tuple, Location]):
raise ValueError("Location should be of type Well or Tuple")

return x,y,z

def vol2move(self, vol):
"""Converts desired volume in uL to a movement of the pipette motor axis
Expand All @@ -163,6 +160,7 @@ def vol2move(self, vol):

return dv

@requires_active_tool
def prime(self, s=2500):
"""Moves the plunger to the low-point on the pipette motor axis to prepare for further commands
Note::This position should not engage the pipette tip plunger
Expand All @@ -173,6 +171,7 @@ def prime(self, s=2500):
self._machine.move_to(v=self.zero_position, s = s, wait=True)
self.is_primed = True

@requires_active_tool
def _aspirate(self, vol: float, s:int = 2000):
"""Moves the plunger upwards to aspirate liquid into the pipette tip
Expand All @@ -191,7 +190,8 @@ def _aspirate(self, vol: float, s:int = 2000):
end_pos = float(pos['V']) + dv

self._machine.move_to(v=end_pos, s=s )


@requires_active_tool
@tip_check
def aspirate(self, vol: float, location : Union[Well, Tuple, Location], s:int = 2000):
"""Moves the pipette to the specified location and aspirates the desired volume of liquid
Expand All @@ -218,6 +218,7 @@ def aspirate(self, vol: float, location : Union[Well, Tuple, Location], s:int =
self._machine.move_to(z=z)
self._aspirate(vol, s=s)

@requires_active_tool
@tip_check
def _dispense(self,vol: float, s:int = 2000):
"""Moves the plunger downwards to dispense liquid out of the pipette tip
Expand All @@ -240,6 +241,7 @@ def _dispense(self,vol: float, s:int = 2000):
# raise ToolStateError ("Error : The volume to be dispensed is greater than what was aspirated")
self._machine.move_to(v = end_pos, s=s )

@requires_active_tool
@tip_check
def dispense(self, vol: float, location :Union[Well, Tuple, Location], s:int = 2000):
"""Moves the pipette to the specified location and dispenses the desired volume of liquid
Expand Down Expand Up @@ -271,6 +273,7 @@ def dispense(self, vol: float, location :Union[Well, Tuple, Location], s:int = 2
self._machine.move_to(z=z)
self._dispense(vol, s=s)

@requires_active_tool
@tip_check
def transfer(self, vol: float, source_well: Union[Well, Tuple, Location],
destination_well :Union[Well, Tuple, Location] , s:int = 3000,
Expand Down Expand Up @@ -353,7 +356,8 @@ def transfer(self, vol: float, source_well: Union[Well, Tuple, Location],
# if new_tip == 'always':

#TODO: need to add new_tip option!


@requires_active_tool
@tip_check
def blowout(self, s : int = 3000):
"""Blows out any remaining liquid in the pipette tip
Expand All @@ -367,6 +371,7 @@ def blowout(self, s : int = 3000):
self._machine.move_to(v = self.blowout_position, s=s)
self.prime()

@requires_active_tool
@tip_check
def air_gap(self, vol):
"""Moves the plunger upwards to aspirate air into the pipette tip
Expand All @@ -381,6 +386,7 @@ def air_gap(self, vol):
self._machine.move_to(z = well.top_ + 20)
self._machine.move(v= -1*dv)

@requires_active_tool
@tip_check
def mix(self, vol: float, n: int, s: int = 5000):
"""Mixes liquid by alternating aspirate and dispense steps for the specified number of times
Expand All @@ -406,6 +412,7 @@ def mix(self, vol: float, n: int, s: int = 5000):
self.prime(s=s)

## In progress (2023-10-12) To test
@requires_active_tool
@tip_check
def stir(self, n_times: int = 1, height: float= None):
"""Stirs the liquid in the current well by moving the pipette tip in a circular motion
Expand Down Expand Up @@ -484,6 +491,7 @@ def add_tiprack(self, tiprack: Union[Labware, list]):

self.first_available_tip = self.tipiterator.next()

@requires_active_tool
def _pickup_tip(self, z):
"""Moves the Jubilee Z-axis upwards to pick up a pipette tip and stops once the tip sensor is triggered
Expand All @@ -496,7 +504,8 @@ def _pickup_tip(self, z):
else:
raise ToolStateError("Error: Pipette already equipped with a tip.")
#TODO: Should this be an error or a warning?


@requires_active_tool
def pickup_tip(self, tip_ : Union[Well, Tuple] = None):
"""Moves the pipette to the specified location and picks up a tip
Expand Down Expand Up @@ -524,6 +533,7 @@ def pickup_tip(self, tip_ : Union[Well, Tuple] = None):

#TODO: This should probably iterate the next available tip so that if you use a tip then replace it, you have to manually specify to go use that tip again rather than it just getting picked up.

@requires_active_tool
def return_tip(self, location: Well = None):
"""Returns the pipette tip to the either the specified location or to where the tip was picked up from
Expand All @@ -544,7 +554,7 @@ def return_tip(self, location: Well = None):
self.has_tip = False
self.update_z_offset(tip=False)


@requires_active_tool
def _drop_tip(self):
"""Moves the plunger to eject the pipette tip
Expand All @@ -560,6 +570,7 @@ def increment_tip(self):
"""
self.first_available_tip = self.tipiterator.next()

@requires_active_tool
@tip_check
def drop_tip(self, location: Union[Well, Tuple]):
"""Moves the pipette to the specified location and drops the pipette tip
Expand Down
17 changes: 11 additions & 6 deletions science_jubilee/tools/Syringe.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .Tool import Tool, ToolStateError
from .Tool import Tool, ToolStateError, ToolConfigurationError, requires_active_tool
from science_jubilee.labware.Labware import Labware, Well
from typing import Tuple, Union
import warnings
Expand Down Expand Up @@ -55,23 +55,26 @@ def check_bounds(self, pos):
"""Disallow commands outside of the syringe's configured range"""
if pos > self.max_range or pos < self.min_range:
raise ToolStateError(f"Error: {pos} is out of bounds for the syringe!")


@requires_active_tool
def _aspirate(self, vol: float, s: int = 2000):
"""Aspirate a certain number of milliliters."""
de = vol * -1 * self.mm_to_ml
pos = self._machine.get_position()
end_pos = float(pos[self.e_drive]) + de
self.check_bounds(end_pos)
self._machine.move(de=de, wait = True)


@requires_active_tool
def _dispense(self, vol, s: int = 2000):
"""Dispense a certain number of milliliters."""
de = vol * self.mm_to_ml
pos = self._machine.get_position()
end_pos = float(pos[self.e_drive]) + de
self.check_bounds(end_pos)
self._machine.move(de=de, wait = True)


@requires_active_tool
def aspirate(
self,
vol: float,
Expand All @@ -95,7 +98,8 @@ def aspirate(
self._machine.move_to(x=x, y=y)
self._machine.move_to(z=z)
self._aspirate(vol, s=s)


@requires_active_tool
def dispense(
self,
vol: float,
Expand All @@ -121,7 +125,8 @@ def dispense(
self._machine.move_to(x=x, y=y)
self._machine.move_to(z=z)
self._dispense(vol, s=s)


@requires_active_tool
def transfer(
self,
vol: float,
Expand Down
15 changes: 14 additions & 1 deletion science_jubilee/tools/Tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def __init__(self, index, name, **kwargs):
raise ToolConfigurationError("Incorrect usage: load_tool(<tool_number>, <name>, **kwargs)")
self.index = index
self.name = name
self.is_active_tool = False

for k,v in kwargs.items():
setattr(self, k, v )

Expand All @@ -27,4 +29,15 @@ def post_load(self):
#add a park tool method that every tool config can define to do things that need to be done pre or post parking
#ex: make sure pipette has dropped tips before parking



def requires_active_tool(func):
"""Decorator to ensure that a tool cannot complete an action unless it is the
current active tool.
"""
# print('check')
def wrapper(self, *args, **kwargs):
if self.is_active_tool == False:
raise ToolStateError (f"Error: Tool {self.name} is not the current `Active Tool`. Cannot perform this action")
else:
func(self,*args, **kwargs)
return wrapper
5 changes: 3 additions & 2 deletions science_jubilee/tools/WebCamera.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from labware.Labware import Well, Location
from typing import Tuple, Union
from .Tool import Tool
from .Tool import Tool, requires_active_tool


class Camera(Tool):
Expand Down Expand Up @@ -50,7 +50,7 @@ def _getxyz(location: Union[Well, Tuple, Location]):

return x,y,z


@requires_active_tool
def _capture_image(self, timeout = 10):
"""
Capture image from raspberry pi and write to file
Expand All @@ -64,6 +64,7 @@ def _capture_image(self, timeout = 10):

return response.content

@requires_active_tool
def capture_image(self, location: Union[Well, Tuple]):

x, y, z = self._getxyz(location)
Expand Down

0 comments on commit 3c3f11c

Please sign in to comment.