diff --git a/spot_examples/README.md b/spot_examples/README.md index 3cd7d4ad1..a4bf68560 100644 --- a/spot_examples/README.md +++ b/spot_examples/README.md @@ -21,11 +21,15 @@ Once the driver has been started, different examples can be run with the followi ros2 run spot_examples ``` Follow the links on each of the node names for more detailed documentation about how each example works, possible command line arguments, and other safety considerations you may need to take into account. - * [`walk_forward`](docs/walk_forward.md): A simple example that shows how to use ROS 2 to send `RobotCommand` goals to the Spot driver. If you are new to Spot and ROS 2, we recommend starting here. * [`arm_simple`](docs/arm_simple.md): An example of converting the [BD Simple Arm Motion](https://dev.bostondynamics.com/python/examples/arm_simple/readme) example to use ROS 2. * [`send_inverse_kinematic_requests`](docs/send_inverse_kinematics_requests.md): An example that shows how to send inverse kinematics requests to the Spot Arm using ROS 2. -* [`batch_trajectory`](docs/batch_trajectory.md): An example that shows how to send very long trajectories to Spot using ROS2. +* [`batch_trajectory`](docs/batch_trajectory.md): An example that shows how to send very long trajectories to Spot using ROS 2. +* [`hello_spot`](docs/hello_spot.md): An example of converting the [BD Hello Spot]( +https://dev.bostondynamics.com/python/examples/hello_spot/readme +) example to use ROS 2, demonstrating basic movement and image streaming. +* [`arm_with_body_follow`](docs/arm_with_body_follow.md): An example that demonstrates simultaneous locomotion and manipulation using ROS 2. +* [`wasd`](docs/wasd.md): An example that offers basic teleoperation of Spot's locomotion and manipulation capabilities using ROS 2. ## Adding new examples To add examples that demonstrate other features of the Spot driver, create a new node `new_example.py` in [spot_examples](spot_examples) and add it to `setup.py`. Make sure to also write a documentation file (`new_example.md`) in [docs](docs) that explains how to run the example and how the code works, and link to it in this central README. diff --git a/spot_examples/docs/arm_with_body_follow.md b/spot_examples/docs/arm_with_body_follow.md new file mode 100644 index 000000000..a0ed2b236 --- /dev/null +++ b/spot_examples/docs/arm_with_body_follow.md @@ -0,0 +1,69 @@ +# arm_with_body_follow +An example that demonstrates simultaneous locomotion and manipulation using ROS 2. + +## Running the Example +For this example, make sure to position the robot with 2m of clear space in front of the robot, either sitting or standing, as it will stand, move its arm forward, then walk forward. After the Spot driver is running, you can start the example with: +```bash +ros2 run spot_examples arm_with_body_follow +``` +If you launched the driver with a namespace, use the following command instead: +```bash +ros2 run spot_examples arm_with_body_follow --robot +``` +The robot will stand up, place its arm in front of itself, then slowly walk forward. + +## Understanding the Code + +Now let's go through [the code](../spot_examples/arm_with_body_follow.py) and see what's happening. + +Similar to other examples, Hello Spot is designed as a class composed with a [ROS Node](https://docs.ros.org/en/humble/Tutorials/Beginner-CLI-Tools/Understanding-ROS2-Nodes/Understanding-ROS2-Nodes.html) that allows communication over [ROS topics](https://docs.ros.org/en/humble/Tutorials/Beginner-CLI-Tools/Understanding-ROS2-Topics/Understanding-ROS2-Topics.html). + +```python +class ArmWithBodyFollow: + def __init__(self, robot_name: Optional[str] = None, node: Optional[Node] = None) -> None: + self.node = ros_scope.node() +``` +We first run `initialize_robot()`, which will claim then power on the robot: +```python + result = self.robot.command("claim") + ... + result = self.robot.command("power_on") +``` + +Then we run `move()`, which will compose a synchronized command that allows simultaneous execution of mobility and manipulation commands. + +To do this, we first craft a position for the arm to move to in front of the robot, using BD's `bodsyn.client.math_helpers` functions to assist in consecutive transformation composition: +```python + odom_T_body = self.tf_listener.lookup_a_tform_b(self.odom_frame_name, self.grav_aligned_body_frame_name) + ... + odom_T_hand = odom_T_body_se3 * math_helpers.SE3Pose.from_proto(body_T_hand) +``` +then creating the arm_command: +```python + arm_command = RobotCommandBuilder.arm_pose_command( + odom_T_hand.x, + odom_T_hand.y, + odom_T_hand.z, + odom_T_hand.rot.w, + odom_T_hand.rot.x, + odom_T_hand.rot.y, + odom_T_hand.rot.z, + ODOM_FRAME_NAME, + seconds, + ) +``` + +as well as the `follow_arm_command()` that tells the robot's body to follow the arm. +```python + follow_arm_command = RobotCommandBuilder.follow_arm_command() +``` +We build the synchro command, convert, and send the goal. +```python + cmd = RobotCommandBuilder.build_synchro_command(follow_arm_command, arm_command) + ... + action_goal = RobotCommand.Goal() + convert(cmd, action_goal.command) + ... + self.robot_command_client.send_goal_and_wait("arm_with_body_follow", action_goal) +``` +This functionality is exposed to spot_ros2 thanks to our `convert()` function, which allows us to essentially send bosdyn protobuf commands using ROS 2. diff --git a/spot_examples/docs/hello_spot.md b/spot_examples/docs/hello_spot.md new file mode 100644 index 000000000..9fe297c3e --- /dev/null +++ b/spot_examples/docs/hello_spot.md @@ -0,0 +1,133 @@ +# hello_spot +An example of converting the [BD Hello Spot]( +https://dev.bostondynamics.com/python/examples/hello_spot/readme) example to use ROS 2, demonstrating basic movement and image streaming. + +## Running the Example +For this example, make sure to position the robot with 0.5m of clear space on all sides, either sitting or standing, as it will twist and move in place. After the Spot driver is running, you can start the example with: +```bash +ros2 run spot_examples hello_spot +``` +If you launched the driver with a namespace, use the following command instead: +```bash +ros2 run spot_examples hello_spot --robot +``` +The robot should stand (if not already), then twist its body along the yaw axis, then simultaneously bow and take a photo from its front left camera, which will be displayed in a pop up and saved locally on the PC from which spot_driver is run. + +## Understanding the Code + +Now let's go through [the code](../spot_examples/hello_spot.py) and see what's happening. + + +Similar to other examples, Hello Spot is designed as a class composed with a [ROS Node](https://docs.ros.org/en/humble/Tutorials/Beginner-CLI-Tools/Understanding-ROS2-Nodes/Understanding-ROS2-Nodes.html) that allows communication over [ROS topics](https://docs.ros.org/en/humble/Tutorials/Beginner-CLI-Tools/Understanding-ROS2-Topics/Understanding-ROS2-Topics.html). +```python +class HelloSpot: + def __init__(self, robot_name: Optional[str] = None, node: Optional[Node] = None) -> None: + self.node = ros_scope.node() +``` +In this example, we will create a ROS 2 subscriber to listen to the images Spot takes from its front left camera, which it publishes to the `//camera/frontleft/image` topic. +```python + self.image_sub = self.node.create_subscription( + Image, namespace_with(robot_name, "camera/frontleft/image"), self.image_callback, 10 + ) +``` + + +We then instantiate both SimpleSpotCommander() and ActionClientWrapper() as ways to send both simple and more complex commands to Spot, respectively. +```python + self.robot = SimpleSpotCommander(self._robot_name, node) + self.robot_command_client = ActionClientWrapper(RobotCommand, 'robot_command', node) +``` +The wrapper we use here automatically waits for the action server to become available during construction. It also offers the `send_goal_and_wait` function that we’ll use later. + + +In `initialize_robot()` we first claim the lease of the robot using SimpleSpotCommander, +```python + result = self.robot.command("claim") +``` + +then power it on +```python + result = self.robot.command("power_on") +``` + +Our first commanded movement, `stand_default()`, is a simple stand: +```python + result = self.robot.command("stand") +``` +Our second movement, `stand_twisted()`, will request a stand with a 0.4 radian twist in yaw. For this more complicated movement, we will use the ActionClientWrapper we instantiated, as it will allow us to send more complex RobotCommand goals. We start by creating the SO(3) rotation matrix detailing this twist: +```python + footprint_R_body = bosdyn.geometry.EulerZXY(yaw=0.4, roll=0.0, pitch=0.0) +``` +then feed this rotation as an argument to our RobotCommandGoal +```python + cmd = RobotCommandBuilder.synchro_stand_command(footprint_R_body=footprint_R_body) + +``` +which is converted into a ROS 2 action goal, and synchronously sent to the robot: +```python + action_goal = RobotCommand.Goal() + convert(cmd, action_goal.command) + self.logger.info("Twisting robot") + self.robot_command_client.send_goal_and_wait("twisting_robot", action_goal) +``` + +Our final movement, `stand_3_pt_traj()` will have the robot follow a standing trajectory through thre points, resembling a bow. + +We first obtain the robot's current "world" frame to robot body frame transformation, which we will use as a starting point relative to which the next body positions will be based. +```python + odom_T_flat_body = self.tf_listener.lookup_a_tform_b(self.odom_frame_name, self.grav_aligned_body_frame_name) +``` +Note: The "odom" frame is the most basic "world frame" proxy provided by BD's spot-sdk, and the "grav aligned body frame" represents the robot's body's current 7 DOF pose (orientation + position), but with the Z axis fixed vertically. + +Next, we create the transformations for the remaining 3 body pose "keyframes" +```python + flat_body_T_pose1 = math_helpers.SE3Pose(x=0.075, y=0, z=0, rot=math_helpers.Quat()) + flat_body_T_pose2 = math_helpers.SE3Pose(x=0.0, y=0, z=0, rot=math_helpers.Quat(w=0.9848, x=0, y=0.1736, z=0)) + flat_body_T_pose3 = math_helpers.SE3Pose(x=0.0, y=0, z=0, rot=math_helpers.Quat()) +``` + +convert each of these into protobuf form +```python + traj_point1 = trajectory_pb2.SE3TrajectoryPoint( + pose=(odom_T_flat_body_se3 * flat_body_T_pose1).to_proto(), time_since_reference=seconds_to_duration(t1) + ) + traj_point2 = trajectory_pb2.SE3TrajectoryPoint( + pose=(odom_T_flat_body_se3 * flat_body_T_pose2).to_proto(), time_since_reference=seconds_to_duration(t2) + ) + traj_point3 = trajectory_pb2.SE3TrajectoryPoint( + pose=(odom_T_flat_body_se3 * flat_body_T_pose3).to_proto(), time_since_reference=seconds_to_duration(t3) + ) +``` +assemble into a single protobuf that is converted to a ROS 2 goal and synchronously sent to the robot +```python + traj = trajectory_pb2.SE3Trajectory(points=[traj_point1, traj_point2, traj_point3]) + + body_control = spot_command_pb2.BodyControlParams( + body_pose=spot_command_pb2.BodyControlParams.BodyPose( + root_frame_name=ODOM_FRAME_NAME, base_offset_rt_root=traj + ) + ) + + mobility_params = spot_command_pb2.MobilityParams(body_control=body_control) + + stand_command = RobotCommandBuilder.synchro_stand_command(params=mobility_params) + + stand_command_goal = RobotCommand.Goal() + convert(stand_command, stand_command_goal.command) + self.logger.info("Beginning absolute body control while standing.") + self.robot_command_client.send_goal_and_wait(action_name="hello_spot", goal=stand_command_goal, timeout_sec=10) +``` + + +Lastly, we call `_maybe_display_image()` and `_maybe_save_image()`, which display and save the latest image message stored in `self.latest_image_raw`. +```python + self.logger.info("Displaying image.") + self._maybe_display_image() + self._maybe_save_image() +``` +`self.latest_image_raw` is updated every time the spot_driver publishes a new image to the `//camera/frontleft/image` topic, thanks to the `image_sub` and `image_callback()` +```python + self.image_sub = self.node.create_subscription( + Image, namespace_with(robot_name, "camera/frontleft/image"), self.image_callback, 10 + ) +``` diff --git a/spot_examples/docs/wasd.md b/spot_examples/docs/wasd.md new file mode 100644 index 000000000..6b40b6fb3 --- /dev/null +++ b/spot_examples/docs/wasd.md @@ -0,0 +1,102 @@ +# wasd +An example that offers basic teleoperation of Spot's locomotion and manipulation capabilities using ROS 2. + +## Running the Example +For this example, make sure to position the robot with 2m of clear space on all sides, as you will be able to command walking in translation and rotation and arm stowing and unstowing. +```bash +ros2 run spot_examples wasd +``` +If you launched the driver with a namespace, use the following command instead: +```bash +ros2 run spot_examples wasd --robot +``` +The robot will stand up, and will execute any commands you send through the curses (text-based) interface in your terminal. + +## Understanding the Code + +Now let's go through [the code](../spot_examples/arm_with_body_follow.py) and see what's happening. + +Similar to other examples, Hello Spot is designed as a class composed with a [ROS Node](https://docs.ros.org/en/humble/Tutorials/Beginner-CLI-Tools/Understanding-ROS2-Nodes/Understanding-ROS2-Nodes.html) that allows communication over [ROS topics](https://docs.ros.org/en/humble/Tutorials/Beginner-CLI-Tools/Understanding-ROS2-Topics/Understanding-ROS2-Topics.html). + +```python +class WasdInterface: + ... + def __init__(self, robot_name: Optional[str] = None) -> None: + self.node = ros_scope.node() +``` +In the `__init__()` function of the `WasdInterface` class, we create a `_command_dictionary` member, which maps keypresses to member functions that command functions of the robot. For example, a key press on key "v" will call the `_sit()` member function: +```python + self._command_dictionary = { + ... + ord("v"): self._sit, + ... + } +``` + +The function `_sit()`, in turn, calls the robot's `/sit` service, by sending an asynchronous call of ROS 2 message type `Trigger` through the sit service client `self.cli_sit`. +```python + def _sit(self) -> None: + self.cli_sit.call_async(Trigger.Request()) +``` + +The sit service client, `self.cli_sit`, was instantiated in the constructor of WasdInterface along with several other service clients for services such as stand, power on, and stow arm +```python + self.cli_sit = self.node.create_client(Trigger, namespace_with(robot_name, "sit")) + self.cli_stand = self.node.create_client(Trigger, namespace_with(robot_name, "stand")) + ... + self.cli_power_on = self.node.create_client(Trigger, namespace_with(robot_name, "power_on")) + ... + self.cli_stow = self.node.create_client(Trigger, namespace_with(robot_name, "arm_stow")) +``` +each, of which, had assigned a corresponding keymapping in `_command_dictionary` for its member function +```python + ord("f"): self._stand, + ... + ord("p"): self._toggle_power, + ... + ord("j"): self._stow, + +``` + +To command a forward movement, the "w" key is mapped to the `_move_forward()` member function, +```python + ord("w"): self._move_forward, +``` +which, in turn, calls the helper function `_velocity_cmd_helper()` with details about the direction and magnitude of the velocity requested +```python + def _move_forward(self) -> None: + self._velocity_cmd_helper("move_forward", v_x=VELOCITY_BASE_SPEED) +``` + +The `_velocity_cmd_helper()` function takes 2D holonomic velocity requests (x, y translation and z rotation), crafts a ROS 2 `geometry_msgs/msg/Twist` message, and publishes to the robot's `/cmd_vel` topic. +```python + def _velocity_cmd_helper(self, desc: str = "", v_x: float = 0.0, v_y: float = 0.0, v_rot: float = 0.0) -> None: + twist = Twist() + twist.linear.x = v_x + twist.linear.y = v_y + twist.angular.z = v_rot + start_time = time.time() + while time.time() - start_time < VELOCITY_CMD_DURATION: + self.pub_cmd_vel.publish(twist) + time.sleep(0.01) + self.pub_cmd_vel.publish(Twist()) +``` +As shown above, we don't send this cmd_vel topic once; rather, we continuously send it unti VELOCITY_CMD_DURATION, at which point we send a default `Twist` message, where all velocity fields default to 0, stopping the robot. + +Lasty, we use ROS 2 subscribers to obtain and display information about the robot's state. The current battery state, for example, which includes battery state-of-charge percentage and charge/discharge state, is stored in `WasdInterface`'s `latest_battery_status` member, +```python + self.latest_battery_status: Optional[BatteryStateArray] = None +``` +which is updated every time the battery state ROS 2 subscriber sees a message published by the robot on the `//status/battery_states` topic +```python + self.sub_battery_state = self.node.create_subscription( + BatteryStateArray, namespace_with(robot_name, "status/battery_states"), self._status_battery_callback, 1 + ) +``` +and calls `_status_battery_callback()` +```python + def _status_battery_callback(self, msg: BatteryState) -> None: + self.latest_battery_status = msg +``` + +At each 'tick' of the curses UI refresh, we use `_battery_str()` to return a formatted string represntation of `self.latest_battery_status` to be printed on the screen. diff --git a/spot_examples/setup.py b/spot_examples/setup.py index 6da6939fd..2a1e1c69c 100644 --- a/spot_examples/setup.py +++ b/spot_examples/setup.py @@ -23,6 +23,9 @@ "arm_simple = spot_examples.arm_simple:main", "send_inverse_kinematics_requests = spot_examples.send_inverse_kinematics_requests:main", "batch_trajectory = spot_examples.batch_trajectory:main", + "hello_spot = spot_examples.hello_spot:main", + "arm_with_body_follow = spot_examples.arm_with_body_follow:main", + "wasd = spot_examples.wasd:main", ], }, ) diff --git a/spot_examples/spot_examples/arm_with_body_follow.py b/spot_examples/spot_examples/arm_with_body_follow.py new file mode 100644 index 000000000..131c2ea48 --- /dev/null +++ b/spot_examples/spot_examples/arm_with_body_follow.py @@ -0,0 +1,140 @@ +import argparse +import time +from typing import Optional + +import synchros2.process as ros_process +import synchros2.scope as ros_scope +from bosdyn.api import geometry_pb2 +from bosdyn.client import math_helpers +from bosdyn.client.frame_helpers import GRAV_ALIGNED_BODY_FRAME_NAME, ODOM_FRAME_NAME +from bosdyn.client.robot_command import RobotCommandBuilder +from bosdyn_msgs.conversions import convert +from rclpy.node import Node +from synchros2.action_client import ActionClientWrapper +from synchros2.tf_listener_wrapper import TFListenerWrapper +from synchros2.utilities import namespace_with + +from spot_msgs.action import RobotCommand # type: ignore + +from .simple_spot_commander import SimpleSpotCommander + + +class ArmWithBodyFollow: + def __init__(self, robot_name: Optional[str] = None, node: Optional[Node] = None) -> None: + self.node = ros_scope.node() + if self.node is None: + raise ValueError("no ROS 2 node available (did you use synchros2.process.main?)") + self.logger = self.node.get_logger() + + self.odom_frame_name = namespace_with(robot_name, ODOM_FRAME_NAME) + self.grav_aligned_body_frame_name = namespace_with(robot_name, GRAV_ALIGNED_BODY_FRAME_NAME) + self.tf_listener = TFListenerWrapper(node) + self.tf_listener.wait_for_a_tform_b(self.odom_frame_name, self.grav_aligned_body_frame_name) + self.robot = SimpleSpotCommander(robot_name, node) + self.robot_command_client = ActionClientWrapper(RobotCommand, namespace_with(robot_name, "robot_command"), node) + + def initialize_robot(self) -> bool: + # Claim robot + self.logger.info("Claiming robot") + result = self.robot.command("claim") + if not result.success: + self.node.get_logger().error("Unable to claim robot message was " + result.message) + return False + self.logger.info("Claimed robot") + + # Power on robot + self.logger.info("Powering on robot") + result = self.robot.command("power_on") + if not result.success: + self.logger.error("Unable to power on robot message was " + result.message) + return False + return True + + def move(self) -> None: + self.logger.info("Standing robot up") + result = self.robot.command("stand") + if not result.success: + self.logger.error("Robot did not stand message was " + result.message) + return + self.logger.info("Successfully stood up.") + time.sleep(3) + + # Move the arm to a spot in front of the robot, and command the body to follow the hand. + # Build a position to move the arm to (in meters, relative to the body frame origin.) + x = 1.25 + y = 0 + z = 0.25 + hand_pos_rt_body = geometry_pb2.Vec3(x=x, y=y, z=z) + + # Rotation as a quaternion. + qw = 1 + qx = 0 + qy = 0 + qz = 0 + body_Q_hand = geometry_pb2.Quaternion(w=qw, x=qx, y=qy, z=qz) + + # Build the SE(3) pose of the desired hand position in the moving body frame. + body_T_hand = geometry_pb2.SE3Pose(position=hand_pos_rt_body, rotation=body_Q_hand) + + # Transform the desired from the moving body frame to the odom frame. + odom_T_body = self.tf_listener.lookup_a_tform_b(self.odom_frame_name, self.grav_aligned_body_frame_name) + odom_T_body_se3 = math_helpers.SE3Pose( + odom_T_body.transform.translation.x, + odom_T_body.transform.translation.y, + odom_T_body.transform.translation.z, + math_helpers.Quat( + odom_T_body.transform.rotation.w, + odom_T_body.transform.rotation.x, + odom_T_body.transform.rotation.y, + odom_T_body.transform.rotation.z, + ), + ) + + odom_T_hand = odom_T_body_se3 * math_helpers.SE3Pose.from_proto(body_T_hand) + + # duration in seconds + seconds = 5 + + # Create the arm command. + arm_command = RobotCommandBuilder.arm_pose_command( + odom_T_hand.x, + odom_T_hand.y, + odom_T_hand.z, + odom_T_hand.rot.w, + odom_T_hand.rot.x, + odom_T_hand.rot.y, + odom_T_hand.rot.z, + ODOM_FRAME_NAME, + seconds, + ) + + # Tell the robot's body to follow the arm + follow_arm_command = RobotCommandBuilder.follow_arm_command() + + # Combine the arm and mobility commands into one synchronized command. + cmd = RobotCommandBuilder.build_synchro_command(follow_arm_command, arm_command) + + # Convert to ROS2 action + action_goal = RobotCommand.Goal() + convert(cmd, action_goal.command) + self.logger.info("Performing arm with body follow") + + # Send the command + self.robot_command_client.send_goal_and_wait("arm_with_body_follow", action_goal) + + +def cli() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser() + parser.add_argument("--robot", type=str, default=None) + return parser + + +@ros_process.main(cli()) +def main(args: argparse.Namespace) -> None: + hello_spot = ArmWithBodyFollow(args.robot) + hello_spot.initialize_robot() + hello_spot.move() + + +if __name__ == "__main__": + main() diff --git a/spot_examples/spot_examples/hello_spot.py b/spot_examples/spot_examples/hello_spot.py new file mode 100644 index 000000000..4322178a0 --- /dev/null +++ b/spot_examples/spot_examples/hello_spot.py @@ -0,0 +1,227 @@ +import argparse +import os +import time +from typing import Optional + +import bosdyn.geometry +import cv2 +import synchros2.process as ros_process +import synchros2.scope as ros_scope +from bosdyn.api import trajectory_pb2 +from bosdyn.api.spot import robot_command_pb2 as spot_command_pb2 +from bosdyn.client import math_helpers +from bosdyn.client.frame_helpers import GRAV_ALIGNED_BODY_FRAME_NAME, ODOM_FRAME_NAME +from bosdyn.client.robot_command import RobotCommandBuilder +from bosdyn.util import seconds_to_duration +from bosdyn_msgs.conversions import convert +from cv_bridge import CvBridge +from rclpy.node import Node +from sensor_msgs.msg import Image +from synchros2.action_client import ActionClientWrapper +from synchros2.tf_listener_wrapper import TFListenerWrapper +from synchros2.utilities import namespace_with + +from spot_msgs.action import RobotCommand # type: ignore + +from .simple_spot_commander import SimpleSpotCommander + + +class HelloSpot: + def __init__(self, robot_name: Optional[str] = None, node: Optional[Node] = None) -> None: + self.node = ros_scope.node() + if self.node is None: + raise ValueError("no ROS 2 node available (did you use bdai_ros2_wrapper.process.main?)") + self.logger = self.node.get_logger() + + self.image_sub = self.node.create_subscription( + Image, namespace_with(robot_name, "camera/frontleft/image"), self.image_callback, 10 + ) + self.image_sub # prevent unused variable warning + self.pause_image_update = False + + self.br = CvBridge() + + self.odom_frame_name = namespace_with(robot_name, ODOM_FRAME_NAME) + self.grav_aligned_body_frame_name = namespace_with(robot_name, GRAV_ALIGNED_BODY_FRAME_NAME) + self.tf_listener = TFListenerWrapper(node) + self.tf_listener.wait_for_a_tform_b(self.odom_frame_name, self.grav_aligned_body_frame_name) + self.robot = SimpleSpotCommander(robot_name, node) + self.robot_command_client = ActionClientWrapper(RobotCommand, namespace_with(robot_name, "robot_command"), node) + + def initialize_robot(self) -> bool: + # Claim robot + self.logger.info("Claiming robot") + result = self.robot.command("claim") + if not result.success: + self.node.get_logger().error("Unable to claim robot message was " + result.message) + return False + self.logger.info("Claimed robot") + + # Power on robot + self.logger.info("Powering on robot") + result = self.robot.command("power_on") + if not result.success: + self.logger.error("Unable to power on robot message was " + result.message) + return False + return True + + def stand_default(self) -> bool: + self.logger.info("Standing robot up") + result = self.robot.command("stand") + if not result.success: + self.logger.error("Robot did not stand message was " + result.message) + return False + self.logger.info("Successfully stood up.") + time.sleep(3) + return True + + def stand_twisted(self) -> None: + # Tell the robot to stand in a twisted position. + # + # The RobotCommandBuilder constructs command messages, which are then + # issued to the robot using "robot_command" on the command client. + # + # In this example, the RobotCommandBuilder generates a stand command + # message with a non-default rotation in the footprint frame. The footprint + # frame is a gravity aligned frame with its origin located at the geometric + # center of the feet. The X axis of the footprint frame points forward along + # the robot's length, the Z axis points up aligned with gravity, and the Y + # axis is the cross-product of the two. + footprint_R_body = bosdyn.geometry.EulerZXY(yaw=0.4, roll=0.0, pitch=0.0) + + cmd = RobotCommandBuilder.synchro_stand_command(footprint_R_body=footprint_R_body) + action_goal = RobotCommand.Goal() + convert(cmd, action_goal.command) + self.logger.info("Twisting robot") + self.robot_command_client.send_goal_and_wait("twisting_robot", action_goal) + + self.logger.info("Robot standing twisted.") + time.sleep(3) + + def stand_3_pt_traj(self) -> None: + # Now compute an absolute desired position and orientation of the robot body origin. + # Use the frame helper class to compute the world to gravity aligned body frame transformation. + # Note, the robot_state used here was cached from before the above yaw stand command, + # so it contains the nominal stand pose. + # odom_T_flat_body = get_a_tform_b(robot_state.kinematic_state.transforms_snapshot, + # ODOM_FRAME_NAME, GRAV_ALIGNED_BODY_FRAME_NAME) + + ## spot_ros2 replacement + odom_T_flat_body = self.tf_listener.lookup_a_tform_b(self.odom_frame_name, self.grav_aligned_body_frame_name) + + odom_T_flat_body_se3 = math_helpers.SE3Pose( + odom_T_flat_body.transform.translation.x, + odom_T_flat_body.transform.translation.y, + odom_T_flat_body.transform.translation.z, + math_helpers.Quat( + odom_T_flat_body.transform.rotation.w, + odom_T_flat_body.transform.rotation.x, + odom_T_flat_body.transform.rotation.y, + odom_T_flat_body.transform.rotation.z, + ), + ) + + # Specify a trajectory to shift the body forward followed by looking down, then return to nominal. + # Define times (in seconds) for each point in the trajectory. + t1 = 2.5 + t2 = 5.0 + t3 = 7.5 + + # Specify the poses as transformations to the cached flat_body pose. + flat_body_T_pose1 = math_helpers.SE3Pose(x=0.075, y=0, z=0, rot=math_helpers.Quat()) + flat_body_T_pose2 = math_helpers.SE3Pose(x=0.0, y=0, z=0, rot=math_helpers.Quat(w=0.9848, x=0, y=0.1736, z=0)) + flat_body_T_pose3 = math_helpers.SE3Pose(x=0.0, y=0, z=0, rot=math_helpers.Quat()) + + # Build the points in the trajectory. + traj_point1 = trajectory_pb2.SE3TrajectoryPoint( + pose=(odom_T_flat_body_se3 * flat_body_T_pose1).to_proto(), time_since_reference=seconds_to_duration(t1) + ) + traj_point2 = trajectory_pb2.SE3TrajectoryPoint( + pose=(odom_T_flat_body_se3 * flat_body_T_pose2).to_proto(), time_since_reference=seconds_to_duration(t2) + ) + traj_point3 = trajectory_pb2.SE3TrajectoryPoint( + pose=(odom_T_flat_body_se3 * flat_body_T_pose3).to_proto(), time_since_reference=seconds_to_duration(t3) + ) + + # Build the trajectory proto by combining the points. + traj = trajectory_pb2.SE3Trajectory(points=[traj_point1, traj_point2, traj_point3]) + + body_control = spot_command_pb2.BodyControlParams( + body_pose=spot_command_pb2.BodyControlParams.BodyPose( + root_frame_name=ODOM_FRAME_NAME, base_offset_rt_root=traj + ) + ) + + mobility_params = spot_command_pb2.MobilityParams(body_control=body_control) + + stand_command = RobotCommandBuilder.synchro_stand_command(params=mobility_params) + + stand_command_goal = RobotCommand.Goal() + convert(stand_command, stand_command_goal.command) + self.logger.info("Beginning absolute body control while standing.") + self.robot_command_client.send_goal_and_wait(action_name="hello_spot", goal=stand_command_goal, timeout_sec=10) + self.logger.info("Finished absolute body control while standing.") + + self.logger.info("Displaying image.") + self._maybe_display_image() + self._maybe_save_image() + + self.logger.info("Goodbye, human!") + + def image_callback(self, image_raw: Image) -> None: + self.logger.debug("recieved image") + if self.pause_image_update: + self.logger.debug("pausing image update while displaying and saving current image.") + return + self.latest_image_raw = image_raw + + def _maybe_display_image(self, display_time: float = 3.0) -> None: + """Try to display image, if client has correct deps.""" + + self.pause_image_update = True # to ensure we display and save same image + + try: + image = self.br.imgmsg_to_cv2(self.latest_image_raw) + cv2.imshow("Hello, human!", image) + cv2.waitKey(0) + time.sleep(display_time) + except Exception as exc: + self.logger.warning("Exception thrown displaying image. %r", exc) + + def _maybe_save_image(self, path: Optional[str] = None) -> None: + """Try to save image, if client has correct deps.""" + + name = "hello-spot-img.jpg" + if path is not None and os.path.exists(path): + path = os.path.join(os.getcwd(), path) + name = os.path.join(path, name) + self.logger.info("Saving image to: %s", name) + else: + self.logger.info(f"Saving image to working directory as {name}") + try: + image = self.br.imgmsg_to_cv2(self.latest_image_raw) + cv2.imwrite(name, image) + + except Exception as exc: + self.logger.warning("Exception thrown saving image. %r", exc) + + self.pause_image_update = False # resume image updating + + +def cli() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser() + parser.add_argument("--robot", type=str, default=None) + return parser + + +@ros_process.main(cli()) +def main(args: argparse.Namespace) -> None: + hello_spot = HelloSpot(args.robot) + hello_spot.initialize_robot() + hello_spot.stand_default() + hello_spot.stand_twisted() + hello_spot.stand_3_pt_traj() + + +if __name__ == "__main__": + main() diff --git a/spot_examples/spot_examples/wasd.py b/spot_examples/spot_examples/wasd.py new file mode 100644 index 000000000..77c7bdd53 --- /dev/null +++ b/spot_examples/spot_examples/wasd.py @@ -0,0 +1,374 @@ +"""WASD driving of robot.""" +import argparse +import curses +import os +import signal +import sys +import threading +import time +from types import FrameType +from typing import Literal, Optional, Type + +import synchros2.process as ros_process +import synchros2.scope as ros_scope +from bosdyn.client import ResponseError, RpcError +from geometry_msgs.msg import Twist +from std_srvs.srv import Trigger +from synchros2.action_client import ActionClientWrapper +from synchros2.utilities import namespace_with + +from spot_msgs.action import RobotCommand # type: ignore +from spot_msgs.msg import BatteryState, BatteryStateArray, PowerState # type: ignore + +from .simple_spot_commander import SimpleSpotCommander + +VELOCITY_BASE_SPEED = 0.3 # m/s +VELOCITY_BASE_ANGULAR = 0.5 # rad/sec +VELOCITY_CMD_DURATION = 1.0 # seconds +COMMAND_INPUT_RATE = 0.1 + + +class ExitCheck(object): + """A class to help exiting a loop, also capturing SIGTERM to exit the loop.""" + + def __init__(self) -> None: + self._kill_now = False + signal.signal(signal.SIGTERM, self._sigterm_handler) + signal.signal(signal.SIGINT, self._sigterm_handler) + + def __enter__(self) -> "ExitCheck": + return self + + def __exit__( + self, + _type: Optional[Type[BaseException]], + _value: Optional[BaseException], + _traceback: Optional[FrameType], + ) -> Literal[False]: + return False + + def _sigterm_handler(self, _signum: int, _frame: Optional[FrameType]) -> None: + self._kill_now = True + + def request_exit(self) -> None: + """Manually trigger an exit (rather than sigterm/sigint).""" + self._kill_now = True + + @property + def kill_now(self) -> bool: + """Return the status of the exit checker indicating if it should exit.""" + return self._kill_now + + +class WasdInterface: + """A curses interface for driving the robot.""" + + def __init__(self, robot_name: Optional[str] = None) -> None: + self.robot_name = robot_name + + self.node = ros_scope.node() + if self.node is None: + raise ValueError("no ROS 2 node available (did you use bdai_ros2_wrapper.process.main?)") + + self.logger = self.node.get_logger() + + self._lock = threading.Lock() + print("Lock initialized") + self._command_dictionary = { + 27: self._stop, # ESC key + ord("\t"): self._quit_program, + ord("r"): self._self_right, + ord("p"): self._toggle_power, + ord("v"): self._sit, + ord("b"): self._battery_change_pose, + ord("f"): self._stand, + ord("w"): self._move_forward, + ord("s"): self._move_backward, + ord("a"): self._strafe_left, + ord("d"): self._strafe_right, + ord("q"): self._turn_left, + ord("e"): self._turn_right, + ord("u"): self._unstow, + ord("j"): self._stow, + } + self._locked_messages = ["", "", ""] # string: displayed message for user + self._exit_check: Optional[ExitCheck] = None + + # Stuff that is set in start() + self._robot_id = None + + self.pub_cmd_vel = self.node.create_publisher(Twist, namespace_with(robot_name, "cmd_vel"), 1) + + self.latest_power_state_status: Optional[PowerState] = None + self.latest_battery_status: Optional[BatteryStateArray] = None + + self.sub_status_power_state = self.node.create_subscription( + PowerState, namespace_with(robot_name, "status/power_states"), self._status_power_state_callback, 1 + ) + self.sub_battery_state = self.node.create_subscription( + BatteryStateArray, namespace_with(robot_name, "status/battery_states"), self._status_battery_callback, 1 + ) + + self.cli_self_right = self.node.create_client(Trigger, namespace_with(robot_name, "self_right")) + self.cli_sit = self.node.create_client(Trigger, namespace_with(robot_name, "sit")) + self.cli_stand = self.node.create_client(Trigger, namespace_with(robot_name, "stand")) + self.cli_stop = self.node.create_client(Trigger, namespace_with(robot_name, "stop")) + self.cli_stow = self.node.create_client(Trigger, namespace_with(robot_name, "arm_stow")) + self.cli_unstow = self.node.create_client(Trigger, namespace_with(robot_name, "arm_unstow")) + self.cli_power_off = self.node.create_client(Trigger, namespace_with(robot_name, "power_off")) + self.cli_power_on = self.node.create_client(Trigger, namespace_with(robot_name, "power_on")) + self.cli_rollover = self.node.create_client(Trigger, namespace_with(robot_name, "rollover")) + + self.robot = SimpleSpotCommander(robot_name, self.node) + self.robot_command_client = ActionClientWrapper( + RobotCommand, namespace_with(robot_name, "robot_command"), self.node + ) + + def start(self) -> bool: + # """Begin communication with the robot.""" + + # Claim robot + self.logger.info("Claiming robot") + result = self.robot.command("claim") + if not result.success: + self.logger.error("Unable to claim robot message was " + result.message) + return False + self.logger.info("Claimed robot") + + # Power on robot + self.logger.info("Powering on robot") + result = self.robot.command("power_on") + if not result.success: + self.logger.error("Unable to power on robot message was " + result.message) + return False + + self.logger.info("Standing robot up") + result = self.robot.command("stand") + if not result.success: + self.logger.error("Robot did not stand message was " + result.message) + return False + self.logger.info("Successfully stood up.") + time.sleep(3) + + return True + + def flush_buffer(self, stdscr: curses.window) -> None: + """Manually flush the curses input buffer""" + key = 0 + while key != -1: + key = stdscr.getch() + + def add_message(self, msg_text: str) -> None: + """Display the given message string to the user in the curses interface.""" + with self._lock: + self._locked_messages = [msg_text] + self._locked_messages[:-1] + + def message(self, idx: int) -> str: + """Grab one of the 3 last messages added.""" + with self._lock: + return self._locked_messages[idx] + + def _status_power_state_callback(self, msg: PowerState) -> None: + self.latest_power_state_status = msg + + def _status_battery_callback(self, msg: BatteryState) -> None: + self.latest_battery_status = msg + + def drive(self, stdscr: curses.window) -> None: + """User interface to control the robot via the passed-in curses screen interface object.""" + with ExitCheck() as self._exit_check: + stdscr.nodelay(True) # Don't block for user input. + stdscr.resize(26, 140) + stdscr.refresh() + + # for debug + curses.echo() + + # try: + while not self._exit_check.kill_now: + self._drive_draw(stdscr) + + try: + cmd = stdscr.getch() + # Do not queue up commands on client + self.flush_buffer(stdscr) + self._drive_cmd(cmd) + time.sleep(COMMAND_INPUT_RATE) + except Exception: + # On robot command fault, sit down safely before killing the program. + self.cli_sit.call_async(Trigger.Request()) + self.cli_power_off.call_async(Trigger.Request()) + time.sleep(2.0) + raise + + def _drive_draw(self, stdscr: curses.window) -> None: + """Draw the interface screen at each update.""" + stdscr.clear() # clear screen + stdscr.resize(26, 140) + stdscr.addstr(0, 0, f"robot name: {self.robot_name:20s}") + stdscr.addstr(2, 0, self._battery_str()) + stdscr.addstr(3, 0, self._power_state_str()) + for i in range(3): + stdscr.addstr(7 + i, 2, self.message(i)) + stdscr.addstr(10, 0, "commands: [tab]: quit, [p]: power ") + stdscr.addstr(11, 0, " [f]: stand, [r]: self-right ") + stdscr.addstr(12, 0, " [v]: sit, [b]: battery change ") + stdscr.addstr(13, 0, " [wasd]: directional strafing ") + stdscr.addstr(14, 0, " [qe]: turning, [ESC]: stop ") + stdscr.addstr(15, 0, " [u]: unstow arm, [j]: stow arm ") + stdscr.addstr(16, 0, "") + + stdscr.refresh() + + def _drive_cmd(self, key: int) -> None: + """Run user commands at each update.""" + try: + cmd_function = self._command_dictionary[key] + cmd_function() + + except KeyError: + if key and key != -1 and key < 256: + self.add_message(f"Unrecognized keyboard command: '{chr(key)}'") + + def _quit_program(self) -> None: + self._sit() + if self._exit_check is not None: + self._exit_check.request_exit() + + def _toggle_power(self) -> None: + power_state = self.latest_power_state_status + if power_state is None: + self.add_message("Could not toggle power because power state is unknown") + return + + if power_state.motor_power_state == PowerState.STATE_OFF: + self.cli_power_on.call_async(Trigger.Request()) + else: + self.cli_power_off.call_async(Trigger.Request()) + + def _self_right(self) -> None: + self.cli_self_right.call_async(Trigger.Request()) + + def _battery_change_pose(self) -> None: + self.cli_rollover.call_async(Trigger.Request()) + + def _sit(self) -> None: + self.cli_sit.call_async(Trigger.Request()) + + def _stand(self) -> None: + self.cli_stand.call_async(Trigger.Request()) + + def _move_forward(self) -> None: + self._velocity_cmd_helper("move_forward", v_x=VELOCITY_BASE_SPEED) + + def _move_backward(self) -> None: + self._velocity_cmd_helper("move_backward", v_x=-VELOCITY_BASE_SPEED) + + def _strafe_left(self) -> None: + self._velocity_cmd_helper("strafe_left", v_y=VELOCITY_BASE_SPEED) + + def _strafe_right(self) -> None: + self._velocity_cmd_helper("strafe_right", v_y=-VELOCITY_BASE_SPEED) + + def _turn_left(self) -> None: + self._velocity_cmd_helper("turn_left", v_rot=VELOCITY_BASE_ANGULAR) + + def _turn_right(self) -> None: + self._velocity_cmd_helper("turn_right", v_rot=-VELOCITY_BASE_ANGULAR) + + def _stop(self) -> None: + self.cli_stop.call_async(Trigger.Request()) + + def _velocity_cmd_helper(self, desc: str = "", v_x: float = 0.0, v_y: float = 0.0, v_rot: float = 0.0) -> None: + twist = Twist() + twist.linear.x = v_x + twist.linear.y = v_y + twist.angular.z = v_rot + start_time = time.time() + while time.time() - start_time < VELOCITY_CMD_DURATION: + self.pub_cmd_vel.publish(twist) + time.sleep(0.01) + self.pub_cmd_vel.publish(Twist()) + + def _stow(self) -> None: + self.cli_stow.call_async(Trigger.Request()) + + def _unstow(self) -> None: + self.cli_unstow.call_async(Trigger.Request()) + + def _power_state_str(self) -> str: + power_state = self.latest_power_state_status + if power_state is None: + return "latest_power_state_status is None" + + motor_power_state_str = { + PowerState.STATE_UNKNOWN: "Unknown", + PowerState.STATE_OFF: "Off", + PowerState.STATE_ON: "On", + PowerState.STATE_POWERING_ON: "Powering On", + PowerState.STATE_POWERING_OFF: "Powering Off", + PowerState.STATE_ERROR: "Error", + } + + shore_power_state_str = { + PowerState.STATE_UNKNOWN_SHORE_POWER: "Unknown Shore Power", + PowerState.STATE_ON_SHORE_POWER: "On Shore Power", + PowerState.STATE_OFF_SHORE_POWER: "Off Shore Power", + } + + motor_state = motor_power_state_str.get(power_state.motor_power_state, "Invalid State") + shore_state = shore_power_state_str.get(power_state.shore_power_state, "Invalid State") + + return f"motor power state: {motor_state}, shore power state: {shore_state}" + + def _battery_str(self) -> str: + battery_state_array = self.latest_battery_status + if not battery_state_array: + return "latest_battery_status is None" + battery_state = battery_state_array.battery_states[0] + status_str = { + BatteryState.STATUS_UNKNOWN: "Unknown", + BatteryState.STATUS_MISSING: "Missing", + BatteryState.STATUS_CHARGING: "Charging", + BatteryState.STATUS_DISCHARGING: "Discharging", + BatteryState.STATUS_BOOTING: "Booting", + } + + status = status_str.get(battery_state.status, "Invalid Status") + + percent = battery_state.charge_percentage + return f"battery: {percent}% ({status})" + + +def cli() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser() + parser.add_argument("--robot", type=str, default=None) + return parser + + +@ros_process.main(cli()) +def main(args: argparse.Namespace) -> bool: + """Command-line interface.""" + + wasd_interface = WasdInterface(args.robot) + try: + wasd_interface.start() + except (ResponseError, RpcError): + wasd_interface.logger.error("Failed to initialize robot communication: TODO: print error") + return False + + try: + # try: + # Prevent curses from introducing a 1-second delay for ESC key + os.environ.setdefault("ESCDELAY", "0") + # Run wasd interface in curses mode, then restore terminal config. + curses.wrapper(wasd_interface.drive) + except Exception as e: + wasd_interface.logger.error("WASD has thrown an error:" + str(e)) + + return True + + +if __name__ == "__main__": + if not main(): + sys.exit(1)