Skip to content

Commit

Permalink
runs
Browse files Browse the repository at this point in the history
  • Loading branch information
WT-MM committed Jan 26, 2025
1 parent 8c1f2c9 commit 2ec3e46
Showing 1 changed file with 50 additions and 49 deletions.
99 changes: 50 additions & 49 deletions scripts/walk.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import pykos
from pykos import KOS
from scipy.spatial.transform import Rotation as R
from tqdm import tqdm

# Ordered joint names for policy (legs only, top-down, left-right order)
JOINT_NAME_LIST = [
Expand Down Expand Up @@ -89,11 +88,11 @@ def __init__(self, joint_names: List[str], joint_signs: Dict[str, float]):

async def offset_in_place(self, kos: KOS, joint_names: List[str]) -> None:
"""Capture current position as zero offset."""
# Get current joint positions
# Get current joint positions (in degrees)
states = await kos.actuator.get_actuators_state([JOINT_NAME_TO_ID[name] for name in joint_names])
current_positions = {name: states.states[i].position for i, name in enumerate(joint_names)}

# Store negative of current positions as offsets
# Store negative of current positions as offsets (in degrees)
self.joint_offsets = {name: -pos for name, pos in current_positions.items()}

# Store IMU offset
Expand All @@ -103,23 +102,25 @@ async def offset_in_place(self, kos: KOS, joint_names: List[str]) -> None:

async def get_obs(self, kos: KOS) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Get robot state with offset compensation."""
# Get joint states
states = await kos.actuator.get_actuators_state([JOINT_NAME_TO_ID[name] for name in JOINT_NAME_LIST])
# Batch state requests
states, imu_data = await asyncio.gather(
kos.actuator.get_actuators_state([JOINT_NAME_TO_ID[name] for name in JOINT_NAME_LIST]),
kos.imu.get_euler_angles()
)

# Apply offsets and signs to positions
# Apply offsets and signs to positions and convert to radians
q = np.array([
(states.states[i].position + self.joint_offsets[name]) * self.joint_signs[name]
np.deg2rad((states.states[i].position + self.joint_offsets[name]) * self.joint_signs[name])
for i, name in enumerate(JOINT_NAME_LIST)
], dtype=np.float32)

# Apply signs to velocities
# Apply signs to velocities and convert to radians
dq = np.array([
states.states[i].velocity * self.joint_signs[name]
np.deg2rad(states.states[i].velocity * self.joint_signs[name])
for i, name in enumerate(JOINT_NAME_LIST)
], dtype=np.float32)

# Get IMU data with offset compensation
imu_data = await kos.imu.get_euler_angles()
# Process IMU data with offset compensation
current_quat = R.from_euler('xyz', [imu_data.roll, imu_data.pitch, imu_data.yaw], degrees=True).as_quat()
if self.orn_offset is not None:
# Apply the offset by quaternion multiplication
Expand All @@ -135,8 +136,10 @@ async def get_obs(self, kos: KOS) -> Tuple[np.ndarray, np.ndarray, np.ndarray, n
return q, dq, quat, gvec

def apply_command(self, position: float, joint_name: str) -> float:
"""Apply offset and sign to outgoing command."""
return (position - self.joint_offsets[joint_name]) * self.joint_signs[joint_name]
"""Apply sign first, then offset to outgoing command. Convert from radians to degrees."""
# Convert from radians to degrees since position from policy is in radians
position_deg = np.rad2deg(position)
return position_deg * self.joint_signs[joint_name] - self.joint_offsets[joint_name]

async def pd_control(
kos: KOS,
Expand Down Expand Up @@ -237,8 +240,8 @@ async def run_robot(
joint_id = JOINT_NAME_TO_ID[joint_name]
position = robot_state.apply_command(float(target_q[i] + default[i]), joint_name)
commands.append({"actuator_id": joint_id, "position": position})
await kos.actuator.command_actuators(commands)

# await kos.actuator.command_actuators(commands)
# print(gvec)
sleep = model_info["policy_dt"] - (time.time() - process_start)
if sleep > 0:
await asyncio.sleep(sleep)
Expand All @@ -258,43 +261,41 @@ async def main():
parser = argparse.ArgumentParser(description="Real robot walking deployment script.")
parser.add_argument("--load_model", type=str, required=True, help="Path to policy model")
parser.add_argument("--keyboard_use", action="store_true", help="Enable keyboard control")
parser.add_argument("--robot_ip", type=str, default="100.69.185.128", help="Robot IP address")
parser.add_argument("--ip", type=str, default="localhost", help="Robot IP address")
args = parser.parse_args()

# Initialize KOS and policy
kos = KOS(ip=args.robot_ip)
policy = ONNXModel(args.load_model)

# Get model info from policy metadata
metadata = policy.get_metadata()
model_info = {
"num_actions": metadata["num_actions"],
"num_observations": metadata["num_observations"],
"robot_effort": metadata["robot_effort"],
"robot_stiffness": metadata["robot_stiffness"],
"robot_damping": metadata["robot_damping"],
"tau_factor": metadata["tau_factor"],
"policy_dt": metadata["sim_dt"] * metadata["sim_decimation"],
}

breakpoint()

# Initialize velocity commands
global x_vel_cmd, y_vel_cmd, yaw_vel_cmd
if args.keyboard_use:
x_vel_cmd, y_vel_cmd, yaw_vel_cmd = 0.0, 0.0, 0.0
pygame.init()
pygame.display.set_caption("Robot Control")
else:
x_vel_cmd, y_vel_cmd, yaw_vel_cmd = 0.5, 0.0, 0.0

# Run robot control
await run_robot(
kos=kos,
policy=policy,
model_info=model_info,
keyboard_use=args.keyboard_use,
)
async with KOS(ip=args.ip) as kos:
policy = ONNXModel(args.load_model)

# Get model info from policy metadata
metadata = policy.get_metadata()
model_info = {
"num_actions": metadata["num_actions"],
"num_observations": metadata["num_observations"],
"robot_effort": metadata["robot_effort"],
"robot_stiffness": metadata["robot_stiffness"],
"robot_damping": metadata["robot_damping"],
"tau_factor": metadata["tau_factor"],
"policy_dt": metadata["sim_dt"] * metadata["sim_decimation"],
}

# Initialize velocity commands
global x_vel_cmd, y_vel_cmd, yaw_vel_cmd
if args.keyboard_use:
x_vel_cmd, y_vel_cmd, yaw_vel_cmd = 0.0, 0.0, 0.0
pygame.init()
pygame.display.set_caption("Robot Control")
else:
x_vel_cmd, y_vel_cmd, yaw_vel_cmd = 0.5, 0.0, 0.0

# Run robot control
await run_robot(
kos=kos,
policy=policy,
model_info=model_info,
keyboard_use=args.keyboard_use,
)

if __name__ == "__main__":
asyncio.run(main())

0 comments on commit 2ec3e46

Please sign in to comment.