forked from 1191000606/calibration
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrobot_method.py
162 lines (133 loc) · 5.18 KB
/
robot_method.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#! /usr/bin/env python3
###
# KINOVA (R) KORTEX (TM)
#
# Copyright (c) 2018 Kinova inc. All rights reserved.
#
# This software may be modified and distributed
# under the terms of the BSD 3-Clause license.
#
# Refer to the LICENSE file for details.
#
###
import time
from kortex_api.Exceptions.KServerException import KServerException
from kortex_api.autogen.messages import Base_pb2
import threading
def forward_kinematics(base):
try:
input_joint_angles = base.GetMeasuredJointAngles()
except KServerException as ex:
print("Unable to get joint angles")
print("Error_code:{} , Sub_error_code:{} ".format(ex.get_error_code(), ex.get_error_sub_code()))
print("Caught expected error: {}".format(ex))
return False
try:
pose = base.ComputeForwardKinematics(input_joint_angles)
except KServerException as ex:
print("Unable to compute forward kinematics")
print("Error_code:{} , Sub_error_code:{} ".format(ex.get_error_code(), ex.get_error_sub_code()))
print("Caught expected error: {}".format(ex))
return False
return pose.x, pose.y, pose.z, pose.theta_x, pose.theta_y, pose.theta_z
# Maximum allowed waiting time during actions (in seconds)
TIMEOUT_DURATION = 30
# Create closure to set an event after an END or an ABORT
def check_for_end_or_abort(e):
"""Return a closure checking for END or ABORT notifications
Arguments:
e -- event to signal when the action is completed
(will be set when an END or ABORT occurs)
"""
def check(notification, e = e):
print("EVENT : " + \
Base_pb2.ActionEvent.Name(notification.action_event))
if notification.action_event == Base_pb2.ACTION_END \
or notification.action_event == Base_pb2.ACTION_ABORT:
e.set()
return check
def move_to_home_position(base):
# Make sure the arm is in Single Level Servoing mode
base_servo_mode = Base_pb2.ServoingModeInformation()
base_servo_mode.servoing_mode = Base_pb2.SINGLE_LEVEL_SERVOING
base.SetServoingMode(base_servo_mode)
# Move arm to ready position
print("Moving the arm to a safe position")
action_type = Base_pb2.RequestedActionType()
action_type.action_type = Base_pb2.REACH_JOINT_ANGLES
action_list = base.ReadAllActions(action_type)
action_handle = None
for action in action_list.action_list:
if action.name == "Home":
action_handle = action.handle
if action_handle == None:
print("Can't reach safe position. Exiting")
return False
e = threading.Event()
notification_handle = base.OnNotificationActionTopic(
check_for_end_or_abort(e),
Base_pb2.NotificationOptions()
)
base.ExecuteActionFromReference(action_handle)
finished = e.wait(TIMEOUT_DURATION)
base.Unsubscribe(notification_handle)
if finished:
print("Safe position reached")
else:
print("Timeout on action notification wait")
return finished
def cartesian_action_movement(base, pose):
action = Base_pb2.Action()
action.name = "Example Cartesian action movement"
action.application_data = ""
cartesian_pose = action.reach_pose.target_pose
cartesian_pose.x = pose[0] # (meters)
cartesian_pose.y = pose[1] # (meters)
cartesian_pose.z = pose[2] # (meters)
cartesian_pose.theta_x = pose[3] # (degrees)
cartesian_pose.theta_y = pose[4] # (degrees)
cartesian_pose.theta_z = pose[5] # (degrees)
e = threading.Event()
notification_handle = base.OnNotificationActionTopic(
check_for_end_or_abort(e),
Base_pb2.NotificationOptions()
)
print("Executing action")
base.ExecuteAction(action)
print("Waiting for movement to finish ...")
finished = e.wait(TIMEOUT_DURATION)
base.Unsubscribe(notification_handle)
if not finished:
print("Timeout on action notification wait")
return finished
def gripper_close(base, value = 0.7):
gripper_command = Base_pb2.GripperCommand()
finger = gripper_command.gripper.finger.add()
# Close the gripper with position increments
gripper_command.mode = Base_pb2.GRIPPER_POSITION
position = 0.00
finger.finger_identifier = 1
while position < value:
finger.value = position
# print("Going to position {:0.2f}...".format(finger.value))
base.SendGripperCommand(gripper_command)
position += 0.1
time.sleep(1)
def gripper_open(base):
gripper_command = Base_pb2.GripperCommand()
finger = gripper_command.gripper.finger.add()
# Set speed to open gripper
gripper_command.mode = Base_pb2.GRIPPER_SPEED
finger.value = 0.1
base.SendGripperCommand(gripper_command)
gripper_request = Base_pb2.GripperRequest()
# Wait for reported position to be opened
gripper_request.mode = Base_pb2.GRIPPER_POSITION
while True:
gripper_measure = base.GetMeasuredGripperMovement(gripper_request)
if len (gripper_measure.finger):
# print("Current position is : {0}".format(gripper_measure.finger[0].value))
if gripper_measure.finger[0].value < 0.01:
break
else: # Else, no finger present in answer, end loop
break