diff --git a/scripts/walk.py b/scripts/walk.py index 1816e32..f6f715e 100644 --- a/scripts/walk.py +++ b/scripts/walk.py @@ -13,7 +13,6 @@ import pykos from pykos import KOS from scipy.spatial.transform import Rotation as R -from tqdm import tqdm # Ordered joint names for policy (legs only, top-down, left-right order) JOINT_NAME_LIST = [ @@ -89,11 +88,11 @@ def __init__(self, joint_names: List[str], joint_signs: Dict[str, float]): async def offset_in_place(self, kos: KOS, joint_names: List[str]) -> None: """Capture current position as zero offset.""" - # Get current joint positions + # Get current joint positions (in degrees) states = await kos.actuator.get_actuators_state([JOINT_NAME_TO_ID[name] for name in joint_names]) current_positions = {name: states.states[i].position for i, name in enumerate(joint_names)} - # Store negative of current positions as offsets + # Store negative of current positions as offsets (in degrees) self.joint_offsets = {name: -pos for name, pos in current_positions.items()} # Store IMU offset @@ -103,23 +102,25 @@ async def offset_in_place(self, kos: KOS, joint_names: List[str]) -> None: async def get_obs(self, kos: KOS) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Get robot state with offset compensation.""" - # Get joint states - states = await kos.actuator.get_actuators_state([JOINT_NAME_TO_ID[name] for name in JOINT_NAME_LIST]) + # Batch state requests + states, imu_data = await asyncio.gather( + kos.actuator.get_actuators_state([JOINT_NAME_TO_ID[name] for name in JOINT_NAME_LIST]), + kos.imu.get_euler_angles() + ) - # Apply offsets and signs to positions + # Apply offsets and signs to positions and convert to radians q = np.array([ - (states.states[i].position + self.joint_offsets[name]) * self.joint_signs[name] + np.deg2rad((states.states[i].position + self.joint_offsets[name]) * self.joint_signs[name]) for i, name in enumerate(JOINT_NAME_LIST) ], dtype=np.float32) - # Apply signs to velocities + # Apply signs to velocities and convert to radians dq = np.array([ - states.states[i].velocity * self.joint_signs[name] + np.deg2rad(states.states[i].velocity * self.joint_signs[name]) for i, name in enumerate(JOINT_NAME_LIST) ], dtype=np.float32) - # Get IMU data with offset compensation - imu_data = await kos.imu.get_euler_angles() + # Process IMU data with offset compensation current_quat = R.from_euler('xyz', [imu_data.roll, imu_data.pitch, imu_data.yaw], degrees=True).as_quat() if self.orn_offset is not None: # Apply the offset by quaternion multiplication @@ -135,8 +136,10 @@ async def get_obs(self, kos: KOS) -> Tuple[np.ndarray, np.ndarray, np.ndarray, n return q, dq, quat, gvec def apply_command(self, position: float, joint_name: str) -> float: - """Apply offset and sign to outgoing command.""" - return (position - self.joint_offsets[joint_name]) * self.joint_signs[joint_name] + """Apply sign first, then offset to outgoing command. Convert from radians to degrees.""" + # Convert from radians to degrees since position from policy is in radians + position_deg = np.rad2deg(position) + return position_deg * self.joint_signs[joint_name] - self.joint_offsets[joint_name] async def pd_control( kos: KOS, @@ -237,8 +240,8 @@ async def run_robot( joint_id = JOINT_NAME_TO_ID[joint_name] position = robot_state.apply_command(float(target_q[i] + default[i]), joint_name) commands.append({"actuator_id": joint_id, "position": position}) - await kos.actuator.command_actuators(commands) - + # await kos.actuator.command_actuators(commands) + # print(gvec) sleep = model_info["policy_dt"] - (time.time() - process_start) if sleep > 0: await asyncio.sleep(sleep) @@ -258,43 +261,41 @@ async def main(): parser = argparse.ArgumentParser(description="Real robot walking deployment script.") parser.add_argument("--load_model", type=str, required=True, help="Path to policy model") parser.add_argument("--keyboard_use", action="store_true", help="Enable keyboard control") - parser.add_argument("--robot_ip", type=str, default="100.69.185.128", help="Robot IP address") + parser.add_argument("--ip", type=str, default="localhost", help="Robot IP address") args = parser.parse_args() # Initialize KOS and policy - kos = KOS(ip=args.robot_ip) - policy = ONNXModel(args.load_model) - - # Get model info from policy metadata - metadata = policy.get_metadata() - model_info = { - "num_actions": metadata["num_actions"], - "num_observations": metadata["num_observations"], - "robot_effort": metadata["robot_effort"], - "robot_stiffness": metadata["robot_stiffness"], - "robot_damping": metadata["robot_damping"], - "tau_factor": metadata["tau_factor"], - "policy_dt": metadata["sim_dt"] * metadata["sim_decimation"], - } - - breakpoint() - - # Initialize velocity commands - global x_vel_cmd, y_vel_cmd, yaw_vel_cmd - if args.keyboard_use: - x_vel_cmd, y_vel_cmd, yaw_vel_cmd = 0.0, 0.0, 0.0 - pygame.init() - pygame.display.set_caption("Robot Control") - else: - x_vel_cmd, y_vel_cmd, yaw_vel_cmd = 0.5, 0.0, 0.0 - - # Run robot control - await run_robot( - kos=kos, - policy=policy, - model_info=model_info, - keyboard_use=args.keyboard_use, - ) + async with KOS(ip=args.ip) as kos: + policy = ONNXModel(args.load_model) + + # Get model info from policy metadata + metadata = policy.get_metadata() + model_info = { + "num_actions": metadata["num_actions"], + "num_observations": metadata["num_observations"], + "robot_effort": metadata["robot_effort"], + "robot_stiffness": metadata["robot_stiffness"], + "robot_damping": metadata["robot_damping"], + "tau_factor": metadata["tau_factor"], + "policy_dt": metadata["sim_dt"] * metadata["sim_decimation"], + } + + # Initialize velocity commands + global x_vel_cmd, y_vel_cmd, yaw_vel_cmd + if args.keyboard_use: + x_vel_cmd, y_vel_cmd, yaw_vel_cmd = 0.0, 0.0, 0.0 + pygame.init() + pygame.display.set_caption("Robot Control") + else: + x_vel_cmd, y_vel_cmd, yaw_vel_cmd = 0.5, 0.0, 0.0 + + # Run robot control + await run_robot( + kos=kos, + policy=policy, + model_info=model_info, + keyboard_use=args.keyboard_use, + ) if __name__ == "__main__": asyncio.run(main())