import os
import warnings
from pathlib import Path
from copy import deepcopy
from dm_control import mjcf
from mushroom_rl.utils.running_stats import *
import loco_mujoco
from loco_mujoco.environments import ValidTaskConf
from loco_mujoco.environments.humanoids.base_humanoid import BaseHumanoid
from loco_mujoco.utils.reward import MultiTargetVelocityReward
from loco_mujoco.utils import check_validity_task_mode_dataset
[docs]
class BaseHumanoid4Ages(BaseHumanoid):
"""
MuJoCo simulation of 4 simplified humanoid models.
At the beginning of each episode, one of the four humanoid models are
sampled and used to simulate a trajectory. The different humanoids should
resemble an adult, a teenager (∼12 years), a child (∼5 years), and a
toddler (∼1-2 years). This environment can be partially observable by
using state masks to hide the humanoid type indicator from the policy.
"""
def __init__(self, scaling=None, scaling_trajectory_map=None, use_muscles=False,
use_box_feet=True, disable_arms=True, alpha_box_feet=0.5, **kwargs):
"""
Constructor.
Args:
scaling (float or list): Scaling of the humanoids. Should be in > 0.0.
scaling_trajectory_map (list): A list that contains tuples of two integers
for each scaling. Given a set of trajectories, they define the range of
the valid trajectory numbers for each scaling factor.
use_muscles (bool): If True, muscle actuators are used, else one torque actuator
ss used per joint.
use_box_feet (bool): If True, boxes are used as feet (for simplification).
disable_arms (bool): If True, all arm joints are removed and the respective
actuators are removed from the action specification.
alpha_box_feet (float): Alpha parameter of the boxes, which might be added as feet.
"""
if use_muscles:
xml_path = (Path(__file__).resolve().parent.parent / "data" / "humanoid" /
"humanoid_muscle.xml").as_posix()
else:
xml_path = (Path(__file__).resolve().parent.parent / "data" / "humanoid" /
"humanoid_torque.xml").as_posix()
self._use_muscles = use_muscles
action_spec = self._get_action_specification(use_muscles)
observation_spec = self._get_observation_specification()
self._hidable_obs = ("positions", "velocities", "foot_forces", "env_type")
# 0.4 ~ 1-2 year old infant who just started walking
# 0.6 ~ 5 year old boy
# 0.8 ~ 12 year old boy
# 1.0 ~ 20 year old man
self._default_scalings = [0.4, 0.6, 0.8, 1.0]
if scaling is None:
self._scalings = self._default_scalings
else:
if type(scaling) == list:
self._scalings = scaling
else:
self._scalings = [scaling]
self._scaling_trajectory_map = scaling_trajectory_map
# --- Modify the xml, the action_spec, and the observation_spec if needed ---
self._use_box_feet = use_box_feet
self._disable_arms = disable_arms
joints_to_remove, motors_to_remove, equ_constr_to_remove, collision_groups = self._get_xml_modifications()
xml_handle = mjcf.from_path(xml_path)
xml_handles = [self.scale_body(deepcopy(xml_handle), scaling, use_muscles) for scaling in self._scalings]
if use_box_feet or disable_arms:
obs_to_remove = ["q_" + j for j in joints_to_remove] + ["dq_" + j for j in joints_to_remove]
observation_spec = [elem for elem in observation_spec if elem[0] not in obs_to_remove]
action_spec = [ac for ac in action_spec if ac not in motors_to_remove]
for handle, scale in zip(xml_handles, self._scalings):
handle = self._delete_from_xml_handle(handle, joints_to_remove,
motors_to_remove, equ_constr_to_remove)
if use_box_feet:
handle = self._add_box_feet_to_xml_handle(handle, alpha_box_feet, scale)
if disable_arms:
self._reorient_arms(handle)
# call gran-parent
super(BaseHumanoid, self).__init__(xml_handles, action_spec, observation_spec, collision_groups, **kwargs)
if scaling_trajectory_map is not None and self.trajectories is None:
warnings.warn("You have defined a scaling_trajectory_map, but no trajectory was defined. The former "
"will have no effect.")
[docs]
def setup(self, obs):
"""
Function to setup the initial state of the simulation. Initialization can be done either
randomly, from a certain initial, or from the default initial state of the model. If random
is chosen, a trajectory is sampled based on the current model.
Args:
obs (np.array): Observation to initialize the environment from;
"""
self._reward_function.reset_state()
if obs is not None:
raise TypeError("Initializing the environment from an observation is "
"not allowed in this environment.")
else:
if not self.trajectories and self._random_start:
raise ValueError("Random start not possible without trajectory data.")
elif not self.trajectories and self._init_step_no is not None:
raise ValueError("Setting an initial step is not possible without trajectory data.")
elif self._init_step_no is not None and self._random_start:
raise ValueError("Either use a random start or set an initial step, not both.")
if self.trajectories is not None:
if self._random_start:
if self._scaling_trajectory_map:
curr_model = self._current_model_idx
valid_traj_range = self._scaling_trajectory_map[curr_model]
traj_no = np.random.randint(valid_traj_range[0], valid_traj_range[1])
sample = self.trajectories.reset_trajectory(traj_no=traj_no)
else:
sample = self.trajectories.reset_trajectory()
elif self._init_step_no:
traj_len = self.trajectories.trajectory_length
n_traj = self.trajectories.nnumber_of_trajectories
assert self._init_step_no <= traj_len * n_traj
substep_no = int(self._init_step_no % traj_len)
traj_no = int(self._init_step_no / traj_len)
sample = self.trajectories.reset_trajectory(substep_no, traj_no)
self.set_sim_state(sample)
[docs]
def load_trajectory(self, traj_params, scaling_trajectory_map=None, warn=True):
"""
Loads trajectories. If there were trajectories loaded already, this function overrides the latter.
Args:
traj_params (dict): Dictionary of parameters needed to load trajectories;
scaling_trajectory_map (list): A list that contains tuples of two integers
for each scaling. Given a set of trajectories, they define the range of
the valid trajectory numbers for each scaling factor.
warn (bool): If True, a warning will be raised if scaling_trajectory_map is not set
or the trajectory ranges are violated.
"""
super().load_trajectory(traj_params, warn)
if scaling_trajectory_map is None:
if self._scaling_trajectory_map is None and type(self._scalings) is list and len(self._scalings) > 1:
if warn:
warnings.warn("\"scaling_trajectory_map\" is not defined! Loading the default map, which assumes that "
"the trajectory contains an equal number of trajectories for all scalings and that"
"they are ordered in the following order %s." % self._scalings)
n_trajs_per_scaling = self.trajectories.number_of_trajectories / len(self._scalings)
assert n_trajs_per_scaling.is_integer(), "Failed to construct the default" \
"\"scaling_trajectory_map\". The number of trajectory " \
"can not be divided by the number of scalings!"
n_trajs_per_scaling = int(n_trajs_per_scaling)
current_low_idx = 0
self._scaling_trajectory_map = []
for i in range(self.trajectories.number_of_trajectories):
current_high_idx = current_low_idx + n_trajs_per_scaling
self._scaling_trajectory_map.append((current_low_idx, current_high_idx))
current_low_idx = current_high_idx
else:
# only one scaling used
self._scaling_trajectory_map = None
else:
self._scaling_trajectory_map = scaling_trajectory_map
[docs]
def get_mask(self, obs_to_hide):
"""
This function returns a boolean mask to hide observations from a fully observable state.
Args:
obs_to_hide (tuple): A tuple of strings with names of objects to hide.
Hidable objects are "positions", "velocities", "foot_forces", and "env_type".
Returns:
Mask in form of a np.array of booleans. True means that that the obs should be
included, and False means that it should be discarded.
"""
if type(obs_to_hide) == str:
obs_to_hide = (obs_to_hide,)
assert all(x in self._hidable_obs for x in obs_to_hide), "Some of the observations you want to hide are not" \
"supported. Valid observations to hide are %s." \
% (self._hidable_obs,)
pos_dim = len(self._get_joint_pos()) - 2
vel_dim = len(self._get_joint_vel())
force_dim = self._get_grf_size()
env_id_dim = len(self._get_env_id_map(0, self.n_all_models))
mask = []
if "positions" not in obs_to_hide:
mask += [np.ones(pos_dim, dtype=bool)]
else:
mask += [np.zeros(pos_dim, dtype=bool)]
if "velocities" not in obs_to_hide:
mask += [np.ones(vel_dim, dtype=bool)]
else:
mask += [np.zeros(vel_dim, dtype=bool)]
if self._use_foot_forces:
if "foot_forces" not in obs_to_hide:
mask += [np.ones(force_dim, dtype=bool)]
else:
mask += [np.zeros(force_dim, dtype=bool)]
else:
assert "foot_forces" not in obs_to_hide, "Creating a mask to hide foot forces without activating " \
"the latter is not allowed."
if self.more_than_one_env:
if "env_type" not in obs_to_hide:
mask += [np.ones(env_id_dim, dtype=bool)]
else:
mask += [np.zeros(env_id_dim, dtype=bool)]
else:
assert "env_type" not in obs_to_hide, "Creating a mask to hide the env type without having more than " \
"one env is not allowed."
return np.concatenate(mask).ravel()
def _get_observation_space(self):
"""
Returns a tuple of the lows and highs (np.array) of the observation space.
"""
low, high = super(BaseHumanoid4Ages, self)._get_observation_space()
len_env_map = len(self._get_env_id_map(self._current_model_idx, self.n_all_models))
low = np.concatenate([low, np.zeros(len_env_map)])
high = np.concatenate([high, np.ones(len_env_map)])
return low, high
def _create_observation(self, obs):
"""
Creates a full vector of observations.
Args:
obs (np.array): Observation vector to be modified or extended;
Returns:
New observation vector (np.array);
"""
obs = super(BaseHumanoid4Ages, self)._create_observation(obs)
if self.more_than_one_env:
model_idx = self._current_model_idx
else:
model_idx = self._default_scalings.index(self._scalings[0])
env_id_map = self._get_env_id_map(model_idx, self.n_all_models)
obs = np.concatenate([obs, env_id_map])
return obs
def _get_reward_function(self, reward_type, reward_params):
"""
Constructs a reward function.
Args:
reward_type (string): Name of the reward.
reward_params (dict): Parameters of the reward function.
Returns:
Reward function.
"""
if reward_type == "multi_target_velocity":
x_vel_idx = self.get_obs_idx("dq_pelvis_tx")
assert len(x_vel_idx) == 1
x_vel_idx = x_vel_idx[0]
env_id_len = len(self._get_env_id_map(0, self.n_all_models))
goal_reward_func = MultiTargetVelocityReward(x_vel_idx=x_vel_idx, scalings=self._default_scalings,
env_id_len=env_id_len, **reward_params)
else:
goal_reward_func = super()._get_reward_function(reward_type, reward_params)
return goal_reward_func
[docs]
@staticmethod
def scale_body(xml_handle, scaling, use_muscles):
"""
This function scales the kinematics and dynamics of the humanoid model given a Mujoco XML handle.
Args:
xml_handle: Handle to Mujoco XML.
scaling (float): Scaling factor.
Returns:
Modified Mujoco XML handle.
"""
body_scaling = scaling
mesh_handle = xml_handle.find_all("mesh")
head_geoms = ["hat_skull", "hat_jaw", "hat_ribs_cap"]
for h in mesh_handle:
if h.name not in head_geoms: # don't scale head
h.scale *= body_scaling
for h in xml_handle.find_all("geom"):
if h.name in head_geoms: # change position of head
h.pos = [0.0, -0.5 * (1 - scaling), 0.0]
body_handle = xml_handle.find_all("body")
for h in body_handle:
h.pos *= body_scaling
h.inertial.mass *= body_scaling ** 3
# Diagonal elements of the inertia matrix change v with scaling.
# As all off-diagonal elements are 0 here.
h.inertial.fullinertia *= body_scaling ** 5
assert np.array_equal(h.inertial.fullinertia[3:], np.zeros(3)), "Some of the diagonal elements of the" \
"inertia matrix are not zero! Scaling is" \
"not done correctly. Double-Check!"
if use_muscles:
for h in body_handle:
for s in h.site:
s.pos *= body_scaling
actuator_handle = xml_handle.find_all("actuator")
for h in actuator_handle:
if "mot" not in h.name:
h.force *= body_scaling ** 2
if not use_muscles:
actuator_handle = xml_handle.find_all("actuator")
for h in actuator_handle:
h.gear *= body_scaling ** 2
return xml_handle
[docs]
@staticmethod
def generate(env, path, task="walk", mode="all", dataset_type="real", n_models=None, debug=False, **kwargs):
"""
Returns a Humanoid environment corresponding to the specified task.
Args:
env (class): Humanoid class, either HumanoidTorque4Ages or HumanoidMuscle4Ages.
path (str): Path to the dataset.
task (str): Main task to solve. Either "walk" or "run".
mode (str): Mode of the environment. Either "all" (sample between all humanoid envs), "1"
(smallest humanoid), "2" (second smallest humanoid), "3" (teenage humanoid), and "4" (adult humanoid).
dataset_type (str): "real" or "perfect". "real" uses real motion capture data as the
reference trajectory. This data does not perfectly match the kinematics
and dynamics of this environment, hence it is more challenging. "perfect" uses
a perfect dataset.
debug (bool): If True, the smaller test datasets are used for debugging purposes.
Returns:
An MDP of a set of Torque or Muscle Humanoid of different sizes.
"""
if mode == "all":
dataset_suffix = "_all.npz"
scaling = None
elif mode == "1":
dataset_suffix = "_1.npz"
scaling = 0.4
elif mode == "2":
dataset_suffix = "_2.npz"
scaling = 0.6
elif mode == "3":
dataset_suffix = "_3.npz"
scaling = 0.8
elif mode == "4":
dataset_suffix = "_4.npz"
scaling = 1.0
if dataset_type == "real":
local_path = path + dataset_suffix
use_mini_dataset = not os.path.exists(Path(loco_mujoco.__file__).resolve().parent / local_path)
if debug or use_mini_dataset:
if use_mini_dataset:
warnings.warn("Datasets not found, falling back to test datasets. Please download and install "
"the datasets to use this environment for imitation learning!")
local_path = local_path.split("/")
local_path.insert(3, "mini_datasets")
local_path = "/".join(local_path)
traj_path = Path(loco_mujoco.__file__).resolve().parent / local_path
elif dataset_type == "perfect":
local_path = path + dataset_suffix
traj_path = Path(loco_mujoco.__file__).resolve().parent / local_path
if task == "walk":
reward_params = dict(target_velocity=1.25)
elif task == "run":
reward_params = dict(target_velocity=2.5)
# Generate the MDP
mdp = env(scaling=scaling, reward_type="multi_target_velocity", reward_params=reward_params, **kwargs)
# Load the trajectory
env_freq = 1 / mdp._timestep # hz
desired_contr_freq = 1 / mdp.dt # hz
n_substeps = env_freq // desired_contr_freq
if dataset_type == "real":
traj_data_freq = 500 # hz
traj_params = dict(traj_path=traj_path,
traj_dt=(1 / traj_data_freq),
control_dt=(1 / desired_contr_freq))
elif dataset_type == "perfect":
traj_data_freq = 100 # hz
traj_files = mdp.load_dataset_and_get_traj_files(traj_path, traj_data_freq)
traj_params = dict(traj_files=traj_files,
traj_dt=(1 / traj_data_freq),
control_dt=(1 / desired_contr_freq))
mdp.load_trajectory(traj_params, warn=False)
return mdp
@property
def n_all_models(self):
return len(self._default_scalings)