Skip to content

Commit

Permalink
pre-commit: Apply ruff fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jorisv committed Jan 7, 2025
1 parent d4fdd44 commit 5300ae4
Show file tree
Hide file tree
Showing 15 changed files with 81 additions and 77 deletions.
8 changes: 3 additions & 5 deletions bindings/python/pinocchio/shortcuts.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

## In this file, some shortcuts are provided ##

from typing import Tuple

from . import WITH_HPP_FCL, WITH_HPP_FCL_BINDINGS
from . import pinocchio_pywrap_default as pin

Expand All @@ -15,7 +13,7 @@

def buildModelsFromUrdf(
filename, *args, **kwargs
) -> Tuple[pin.Model, pin.GeometryModel, pin.GeometryModel]:
) -> tuple[pin.Model, pin.GeometryModel, pin.GeometryModel]:
"""Parse the URDF file given in input and return a Pinocchio Model followed by corresponding GeometryModels of types specified by geometry_types, in the same order as listed.
Arguments:
- filename - name of the urdf file to load
Expand Down Expand Up @@ -63,7 +61,7 @@ def _buildModelsFromUrdf(
verbose=False,
meshLoader=None,
geometry_types=None,
) -> Tuple[pin.Model, pin.GeometryModel, pin.GeometryModel]:
) -> tuple[pin.Model, pin.GeometryModel, pin.GeometryModel]:
if geometry_types is None:
geometry_types = [pin.GeometryType.COLLISION, pin.GeometryType.VISUAL]

Expand Down Expand Up @@ -119,7 +117,7 @@ def createDatas(*models):

def buildModelsFromSdf(
filename, *args, **kwargs
) -> Tuple[pin.Model, pin.GeometryModel, pin.GeometryModel]:
) -> tuple[pin.Model, pin.GeometryModel, pin.GeometryModel]:
"""Parse the Sdf file given in input and return a Pinocchio Model and a list of Constraint Models, followed by corresponding GeometryModels of types specified by geometry_types, in the same order as listed.
Arguments:
- filename - name of the urdf file to load
Expand Down
12 changes: 6 additions & 6 deletions bindings/python/pinocchio/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,17 @@ def fromListToVectorOfString(items):


__all__ = [
"np",
"npl",
"eye",
"zero",
"rand",
"fromListToVectorOfString",
"isapprox",
"matrixToRpy",
"mprint",
"np",
"npToTTuple",
"npToTuple",
"npl",
"rand",
"rotate",
"rpyToMatrix",
"matrixToRpy",
"fromListToVectorOfString",
"zero",
]
22 changes: 11 additions & 11 deletions bindings/python/pinocchio/visualize/meshcat_visualizer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import warnings
from pathlib import Path
from typing import ClassVar, List
from typing import ClassVar

import numpy as np

Expand All @@ -21,9 +21,9 @@

# DaeMeshGeometry
import xml.etree.ElementTree as Et
from typing import Any, Dict, Optional, Set, Union
from typing import Any, Optional, Union

MsgType = Dict[str, Union[str, bytes, bool, float, "MsgType"]]
MsgType = dict[str, Union[str, bytes, bool, float, "MsgType"]]

try:
import hppfcl
Expand Down Expand Up @@ -110,7 +110,7 @@ def lower(self, object_data: Any) -> MsgType:
}

class DaeMeshGeometry(mg.ReferenceSceneElement):
def __init__(self, dae_path: str, cache: Optional[Set[str]] = None) -> None:
def __init__(self, dae_path: str, cache: Optional[set[str]] = None) -> None:
"""Load Collada files with texture images.
Inspired from
https://gist.github.com/danzimmerman/a392f8eadcf1166eb5bd80e3922dbdc5
Expand All @@ -131,7 +131,7 @@ def __init__(self, dae_path: str, cache: Optional[Set[str]] = None) -> None:
self.dae_raw = text_file.read()

# Parse the image resource in Collada file
img_resource_paths: List[Path] = []
img_resource_paths: list[Path] = []
img_lib_element = Et.parse(dae_path).find(
"{http://www.collada.org/2005/11/COLLADASchema}library_images"
)
Expand All @@ -143,7 +143,7 @@ def __init__(self, dae_path: str, cache: Optional[Set[str]] = None) -> None:
]

# Convert textures to data URL for Three.js ColladaLoader to load them
self.img_resources: Dict[str, str] = {}
self.img_resources: dict[str, str] = {}
for img_path in img_resource_paths:
img_key = str(img_path)
# Return empty string if already in cache
Expand All @@ -164,7 +164,7 @@ def __init__(self, dae_path: str, cache: Optional[Set[str]] = None) -> None:
img_uri = f"data:image/png;base64,{img_data.decode('utf-8')}"
self.img_resources[img_key] = img_uri

def lower(self) -> Dict[str, Any]:
def lower(self) -> dict[str, Any]:
"""Pack data into a dictionary of the format that must be passed to
`Visualizer.window.send`.
"""
Expand Down Expand Up @@ -1112,10 +1112,10 @@ def drawFrameVelocities(self, frame_id: int, v_scale=0.2, color=FRAME_VEL_COLOR)

def _draw_vectors_from_frame(
self,
vecs: List[np.ndarray],
frame_ids: List[int],
vec_names: List[str],
colors: List[int],
vecs: list[np.ndarray],
frame_ids: list[int],
vec_names: list[str],
colors: list[int],
):
"""Draw vectors extending from given frames."""
import meshcat.geometry as mg
Expand Down
3 changes: 1 addition & 2 deletions development/scripts/misc/common_symbols.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
import itertools
import pathlib
import subprocess
import typing


def generate_symbols(shared_library: pathlib.Path) -> typing.Set[str]:
def generate_symbols(shared_library: pathlib.Path) -> set[str]:
# Show symbol
# -D: Dynamic
# -C: Demangled
Expand Down
38 changes: 22 additions & 16 deletions doc/d-practical-exercises/src/continuous.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
import tflearn
from pendulum import Pendulum

### --- Random seed
# --- Random seed
RANDOM_SEED = int((time.time() % 10) * 1000)
print("Seed = %d" % RANDOM_SEED)
print(f"Seed = {RANDOM_SEED}")
np.random.seed(RANDOM_SEED)
tf.set_random_seed(RANDOM_SEED)
random.seed(RANDOM_SEED)
n_init = tflearn.initializations.truncated_normal(seed=RANDOM_SEED)
u_init = tflearn.initializations.uniform(minval=-0.003, maxval=0.003, seed=RANDOM_SEED)

### --- Hyper paramaters
# --- Hyper paramaters
NEPISODES = 100 # Max training steps
NSTEPS = 100 # Max episode length
QVALUE_LEARNING_RATE = 0.001 # Base learning rate for the Q-value Network
Expand All @@ -35,13 +35,13 @@
BATCH_SIZE = 64 # Number of points to be fed in stochastic gradient
NH1 = NH2 = 250 # Hidden layer size

### --- Environment
# --- Environment
env = Pendulum(1) # Continuous pendulum
env.withSinCos = True # State is dim-3: (cosq,sinq,qdot) ...
NX = env.nobs # ... training converges with q,qdot with 2x more neurones.
NU = env.nu # Control is dim-1: joint torque

### --- Q-value and policy networks
# --- Q-value and policy networks


class QValueNetwork:
Expand All @@ -63,7 +63,8 @@ def __init__(self):
self.x = x # Network state <x> input in Q(x,u)
self.u = u # Network control <u> input in Q(x,u)
self.qvalue = qvalue # Network output <Q>
self.variables = tf.trainable_variables()[nvars:] # Variables to be trained
# Variables to be trained
self.variables = tf.trainable_variables()[nvars:]
self.hidens = [netx1, netx2, netu1, netu2] # Hidden layers for debug

def setupOptim(self):
Expand All @@ -75,7 +76,8 @@ def setupOptim(self):
self.qref = qref # Reference Q-values
self.optim = optim # Optimizer
self.gradient = (
gradient # Gradient of Q wrt the control dQ/du (for policy training)
# Gradient of Q wrt the control dQ/du (for policy training)
gradient
)
return self

Expand All @@ -101,7 +103,8 @@ def __init__(self):

self.x = x # Network input <x> in Pi(x)
self.policy = policy # Network output <Pi>
self.variables = tf.trainable_variables()[nvars:] # Variables to be trained
# Variables to be trained
self.variables = tf.trainable_variables()[nvars:]

def setupOptim(self):
qgradient = tf.placeholder(tf.float32, [None, NU])
Expand All @@ -110,7 +113,8 @@ def setupOptim(self):
zip(grad, self.variables)
)

self.qgradient = qgradient # Q-value gradient wrt control (input value)
# Q-value gradient wrt control (input value)
self.qgradient = qgradient
self.optim = optim # Optimizer
return self

Expand All @@ -122,7 +126,7 @@ def setupTargetAssign(self, nominalNet, tau=UPDATE_RATE):
return self


### --- Replay memory
# --- Replay memory
class ReplayItem:
def __init__(self, x, u, r, d, x2):
self.x = x
Expand All @@ -134,7 +138,7 @@ def __init__(self, x, u, r, d, x2):

replayDeque = deque()

### --- Tensor flow initialization
# --- Tensor flow initialization

policy = PolicyNetwork().setupOptim()
policyTarget = PolicyNetwork().setupTargetAssign(policy)
Expand Down Expand Up @@ -167,24 +171,26 @@ def rendertrial(maxiter=NSTEPS, verbose=True):
signal.SIGTSTP, lambda x, y: rendertrial()
) # Roll-out when CTRL-Z is pressed

### History of search
# History of search
h_rwd = []
h_qva = []
h_ste = []

### --- Training
# --- Training
for episode in range(1, NEPISODES):
x = env.reset().T
rsum = 0.0

for step in range(NSTEPS):
u = sess.run(policy.policy, feed_dict={policy.x: x}) # Greedy policy ...
# Greedy policy ...
u = sess.run(policy.policy, feed_dict={policy.x: x})
u += 1.0 / (1.0 + episode + step) # ... with noise
x2, r = env.step(u)
x2 = x2.T
done = False # pendulum scenario is endless.

replayDeque.append(ReplayItem(x, u, r, done, x2)) # Feed replay memory ...
# Feed replay memory ...
replayDeque.append(ReplayItem(x, u, r, done, x2))
if len(replayDeque) > REPLAY_SIZE:
replayDeque.popleft() # ... with FIFO forgetting.

Expand Down Expand Up @@ -260,7 +266,7 @@ def rendertrial(maxiter=NSTEPS, verbose=True):

# \\\END_FOR episode in range(NEPISODES)

print("Average reward during trials: %.3f" % (sum(h_rwd) / NEPISODES))
print(f"Average reward during trials: {sum(h_rwd) / NEPISODES:.3f}")
rendertrial()
plt.plot(np.cumsum(h_rwd) / range(1, NEPISODES))
plt.show()
7 changes: 4 additions & 3 deletions doc/d-practical-exercises/src/ocp.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def display(U, verbose=False):
env.display(x)
time.sleep(5e-2)
if verbose:
print("X%d" % i, x.T)
print(f"X{i}")


class CallBack:
Expand Down Expand Up @@ -66,8 +66,9 @@ def setWithDisplay(self, boolean=None):
callback = CallBack()
signal.signal(signal.SIGTSTP, lambda x, y: callback.setWithDisplay())

### --- OCP resolution
U0 = np.zeros(NSTEPS * env.nu) - env.umax # Initial guess for the control trajectory.
# --- OCP resolution
# Initial guess for the control trajectory.
U0 = np.zeros(NSTEPS * env.nu) - env.umax
bounds = (
[
[-env.umax, env.umax],
Expand Down
26 changes: 14 additions & 12 deletions doc/d-practical-exercises/src/qnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,25 @@
import tensorflow as tf
from dpendulum import DPendulum

### --- Random seed
# --- Random seed
RANDOM_SEED = int((time.time() % 10) * 1000)
print("Seed = %d" % RANDOM_SEED)
print(f"Seed = {RANDOM_SEED}")
np.random.seed(RANDOM_SEED)
tf.set_random_seed(RANDOM_SEED)

### --- Hyper paramaters
# --- Hyper paramaters
NEPISODES = 500 # Number of training episodes
NSTEPS = 50 # Max episode length
LEARNING_RATE = 0.1 # Step length in optimizer
DECAY_RATE = 0.99 # Discount factor

### --- Environment
# --- Environment
env = DPendulum()
NX = env.nx
NU = env.nu


### --- Q-value networks
# --- Q-value networks
class QValueNetwork:
def __init__(self):
x = tf.placeholder(shape=[1, NX], dtype=tf.float32)
Expand All @@ -44,11 +44,12 @@ def __init__(self):
self.x = x # Network input
self.qvalue = qvalue # Q-value as a function of x
self.u = u # Policy as a function of x
self.qref = qref # Reference Q-value at next step (to be set to l+Q o f)
# Reference Q-value at next step (to be set to l+Q o f)
self.qref = qref
self.optim = optim # Optimizer


### --- Tensor flow initialization
# --- Tensor flow initialization
tf.reset_default_graph()
qvalue = QValueNetwork()
sess = tf.InteractiveSession()
Expand Down Expand Up @@ -85,16 +86,17 @@ def rendertrial(maxiter=100):
signal.SIGTSTP, lambda x, y: rendertrial()
) # Roll-out when CTRL-Z is pressed

### --- History of search
# --- History of search
h_rwd = [] # Learning history (for plot).

### --- Training
# --- Training
for episode in range(1, NEPISODES):
x = env.reset()
rsum = 0.0

for step in range(NSTEPS - 1):
u = sess.run(qvalue.u, feed_dict={qvalue.x: onehot(x)})[0] # Greedy policy ...
# Greedy policy ...
u = sess.run(qvalue.u, feed_dict={qvalue.x: onehot(x)})[0]
u = disturb(u, episode) # ... with noise
x2, reward = env.step(u)

Expand All @@ -113,9 +115,9 @@ def rendertrial(maxiter=100):

h_rwd.append(rsum)
if not episode % 20:
print("Episode #%d done with %d sucess" % (episode, sum(h_rwd[-20:])))
print(f"Episode #{episode} done with {sum(h_rwd[-20:])} sucess")

print("Total rate of success: %.3f" % (sum(h_rwd) / NEPISODES))
print(f"Total rate of success: {sum(h_rwd) / NEPISODES:.3f}")
rendertrial()
plt.plot(np.cumsum(h_rwd) / range(1, NEPISODES))
plt.show()
Loading

0 comments on commit 5300ae4

Please sign in to comment.