Skip to content

Commit

Permalink
optimize my model (#1578)
Browse files Browse the repository at this point in the history
* learned identification stuff

* update versions

* add gitignore

* fix some lints

* module structure

* specify sensors

* remove readme

* try to fix render_best

* recorded_actuators -> recorded_actuator_positions

* format taplo

* ruff format
  • Loading branch information
schmidma authored Jan 29, 2025
1 parent 920204a commit a1b4d65
Show file tree
Hide file tree
Showing 12 changed files with 1,739 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.mcap
*.db
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.13
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[project]
name = "learned-identification"
version = "0.1.0"
description = "Add your description here"
requires-python = ">=3.12"
dependencies = [
"click>=8.1.8",
"cmaes>=0.11.1",
"mcap>=1.2.1",
"mediapy>=1.2.2",
"msgpack>=1.1.0",
"mujoco>=3.2.5",
"numpy>=2.1.3",
"optuna-dashboard>=0.17.0",
"optuna>=4.1.0",
"pymysql>=1.1.1",
"pyqt6>=6.7.1",
"tqdm>=4.67.0",
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import json

import click
import mujoco as mj
import optuna
from learned_identification.optimization import objective
from learned_identification.recording import (
load_recorded_actuator_positions,
load_recorded_sensors,
)

SENSORS = [
"head.yaw",
"head.pitch",
"left_leg.hip_yaw_pitch",
"left_leg.hip_roll",
"left_leg.hip_pitch",
"left_leg.knee_pitch",
"left_leg.ankle_pitch",
"left_leg.ankle_roll",
"right_leg.hip_roll",
"right_leg.hip_pitch",
"right_leg.knee_pitch",
"right_leg.ankle_pitch",
"right_leg.ankle_roll",
"left_arm.shoulder_pitch",
"left_arm.shoulder_roll",
"left_arm.elbow_yaw",
"left_arm.elbow_roll",
"left_arm.wrist_yaw",
"right_arm.shoulder_pitch",
"right_arm.shoulder_roll",
"right_arm.elbow_yaw",
"right_arm.elbow_roll",
"right_arm.wrist_yaw",
]


def render_trial(
spec_path: str,
recording_path: str,
study_name: str,
trial_number: str,
storage: str,
video_path: str,
) -> None:
spec = mj.MjSpec.from_file(spec_path)

recorded_actuator_positions = load_recorded_actuator_positions(
spec,
recording_path,
)
recorded_sensors = load_recorded_sensors(
spec,
recording_path,
)

study = optuna.load_study(
study_name=study_name,
storage=storage,
)

trial = (
study.best_trial
if trial_number == "best"
else study.trials[int(trial_number)]
)

print(f"Trial Number: {trial.number}")
print("Parameters:")
print(json.dumps(trial.params, indent=2))
print(f"Stored Value: {trial.value}")

value = objective(
trial,
spec,
recorded_actuator_positions,
recorded_sensors,
sensors=SENSORS,
video_path=video_path,
)
print(f"Computed Value: {value}")


@click.command()
@click.option("--spec", help="Path to the model specification file")
@click.option("--recording", help="Path to the mcap recording file")
@click.option("--study_name", help="Name of the study")
@click.option("--trial", help="Which trial (number or 'best')", default="best")
@click.option(
"--storage",
help="Path to the optuna database",
default="sqlite:///optuna.db",
)
@click.option(
"--video_path",
help="Path to save the video",
default="video.mp4",
)
def run(
spec: str,
recording: str,
study_name: str,
trial: str,
storage: str,
video_path: str,
) -> None:
render_trial(
spec,
recording,
study_name,
trial,
storage,
video_path,
)


if __name__ == "__main__":
run()
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import multiprocessing

import click
from learned_identification.run_optimizer import run_optimization


@click.command()
@click.option(
"--spec",
"spec_path",
required=True,
help="Path to the model specification file",
)
@click.option(
"--recording",
"recording_path",
required=True,
help="Path to the mcap recording file",
)
@click.option("--study", help="Name of the study", required=True)
@click.option(
"--storage",
help="Path to the optuna database",
default="sqlite:///optuna.db",
)
@click.option("--jobs", help="Number of jobs to run", default=1, type=int)
def run_many(
spec_path: str,
recording_path: str,
study: str,
storage: str,
jobs: int,
) -> None:
if jobs == 1:
run_optimization(spec_path, recording_path, study, storage)
return

processes = []
for _ in range(jobs):
p = multiprocessing.Process(
target=run_optimization,
args=(spec_path, recording_path, study, storage),
)
p.start()
print(f"Started process {p.pid}")
processes.append(p)

for p in processes:
p.join()


if __name__ == "__main__":
run_many()
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from dataclasses import dataclass
from typing import Self

import mujoco as mj
import numpy as np
import numpy.typing as npt
import optuna


@dataclass
class ActuatorParameters:
gain_prm: npt.NDArray[np.float64]
dyn_prm: npt.NDArray[np.float64]
bias_prm: npt.NDArray[np.float64]

@classmethod
def suggest_position_actuator(
cls,
trial: optuna.Trial | optuna.trial.FrozenTrial,
name: str,
) -> Self:
kp = trial.suggest_float(f"{name}_kp", low=0.0, high=100.0)
kv = trial.suggest_float(f"{name}_kv", low=0.0, high=100.0)

return cls(
gain_prm=np.array([kp, 0.0, 0.0], dtype=np.float64),
dyn_prm=np.array([1.0, 0.0, 0.0], dtype=np.float64),
bias_prm=np.array([0.0, -kp, -kv], dtype=np.float64),
)

@classmethod
def suggest_from_trial(
cls,
trial: optuna.Trial,
name: str,
) -> Self:
"""Suggest actuator parameters from a trial.
Args:
trial: The Optuna trial.
name: The name of the actuator.
Returns:
The suggested actuator parameters.
"""
gain_prm = [
trial.suggest_float(
f"{name}_gain_prm_{i}",
low=-10.0,
high=10.0,
)
for i in range(3)
]
dyn_prm = [
trial.suggest_float(
f"{name}_dyn_prm_{i}",
low=-10.0,
high=10.0,
)
for i in range(3)
]
bias_prm = [
trial.suggest_float(
f"{name}_bias_prm_{i}",
low=-10.0,
high=10.0,
)
for i in range(3)
]
return cls(
gain_prm=np.array(gain_prm, dtype=np.float64),
dyn_prm=np.array(dyn_prm, dtype=np.float64),
bias_prm=np.array(bias_prm, dtype=np.float64),
)

def populate_actuator(
self,
actuator: mj.MjsActuator,
) -> None:
"""Populate the actuator with the parameters.
Args:
actuator: The actuator to populate.
"""
actuator.gainprm[:3] = self.gain_prm
actuator.dynprm[:3] = self.dyn_prm
actuator.biasprm[:3] = self.bias_prm
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from collections.abc import Sequence

import mujoco as mj
import numpy as np
import numpy.typing as npt
import optuna

from .actuator import ActuatorParameters
from .simulation import simulate_recording


class SimulationLengthError(Exception):
def __init__(self) -> None:
super().__init__(
"The number of simulated sensor data points does not match the "
"number of recorded sensor data points",
)


def populate_actuators(
spec: mj.MjSpec,
trial: optuna.Trial | optuna.trial.FrozenTrial,
) -> None:
for actuator in spec.actuators:
parameters = ActuatorParameters.suggest_position_actuator(
trial,
actuator.name,
)
parameters.populate_actuator(actuator)


def objective(
trial: optuna.Trial | optuna.trial.FrozenTrial,
spec: mj.MjSpec,
recorded_actuator_positions: npt.NDArray[np.float64],
recorded_sensors: npt.NDArray[np.float64],
*,
sensors: Sequence[str],
video_path: str | None = None,
) -> float:
populate_actuators(spec, trial)
simulated_sensor_data = simulate_recording(
spec,
recorded_actuator_positions,
sensors=sensors,
video_path=video_path,
)
if len(simulated_sensor_data) != len(recorded_sensors):
raise SimulationLengthError
squared_error = (simulated_sensor_data - recorded_sensors) ** 2
return squared_error.sum() / len(recorded_actuator_positions)
Loading

0 comments on commit a1b4d65

Please sign in to comment.