Source code for loco_mujoco.utils.trajectory

import warnings
from copy import deepcopy

import numpy as np
from scipy import interpolate


[docs] class Trajectory: """ General class to handle trajectory data. It builds a general trajectory from a numpy bin file(.npy), and automatically interpolates the trajectory to the desired control frequency. This class is used to generate datasets and to sample from the dataset to initialize the simulation. All trajectories are required to be of equal length. """ def __init__(self, keys, low, high, joint_pos_idx, interpolate_map, interpolate_remap, traj_path=None, traj_files=None, interpolate_map_params=None, interpolate_remap_params=None, traj_dt=0.002, control_dt=0.01, ignore_keys=None, clip_trajectory_to_joint_ranges=False, traj_info=None, warn=True): """ Constructor. Args: keys (list): List of keys to extract data from the trajectories. low (np.array): Lower bound of the trajectory values. high (np.array): Upper bound of the trajectory values. joint_pos_idx (np.array): Array including all indices of the joint positions in the trajectory. interpolate_map (func): Function used to map a trajectory to some space that allows interpolation. interpolate_remap (func): Function used to map a transformed trajectory back to the original space after interpolation. traj_path (string): path with the trajectory for the model to follow. Should be a numpy zipped file (.npz) with a 'trajectory_data' array and possibly a 'split_points' array inside. The 'trajectory_data' should be in the shape (joints x observations). If traj_files is specified, this should be None. traj_files (dict): Dictionary containing all trajectory files. If traj_path is specified, this should be None. interpolate_map_params: Set of parameters needed to do the interpolation by the Unitree environment. interpolate_remap_params: Set of parameters needed to do the interpolation by the Unitree environment. traj_dt (float): Time step of the trajectory file. control_dt (float): Model control frequency used to interpolate the trajectory. ignore_keys (list): List of keys to ignore in the dataset. clip_trajectory_to_joint_ranges (bool): If True, the joint positions in the trajectory are clipped between the low and high values in the trajectory. traj_info (list): A list of custom labels for each trajectory. warn (bool): If True, a warning will be raised, if some trajectory ranges are violated. """ assert (traj_path is not None) != (traj_files is not None), "Please specify either traj_path or " \ "traj_files, but not both." # load data if traj_path is not None: self._trajectory_files = np.load(traj_path, allow_pickle=True) else: self._trajectory_files = traj_files # convert to dict to be mutable self._trajectory_files = {k: d for k, d in self._trajectory_files.items()} self.check_if_trajectory_is_in_range(low, high, keys, joint_pos_idx, warn, clip_trajectory_to_joint_ranges) # add all goals to keys (goals have to start with 'goal' if not in keys) keys += [key for key in self._trajectory_files.keys() if key.startswith('goal') and key not in keys] self.keys = keys # remove unwanted keys if ignore_keys is not None: for ik in ignore_keys: keys.remove(ik) # split points mark the beginning of the next trajectory. # The last split_point points to the index behind the last element of the trajectories -> len(traj) if "split_points" in self._trajectory_files.keys(): self.split_points = self._trajectory_files["split_points"] else: self.split_points = np.array([0, len(list(self._trajectory_files.values())[0])]) # Extract trajectory from files. This returns a list of np.arrays. The length of the # list is the number of observations. Each np.array has the shape # (n_trajectories, n_samples, (dim_observation)). If dim_observation is one # the shape of the array is just (n_trajectories, n_samples). self.trajectories = self._extract_trajectory_from_files() if traj_info is not None: assert len(traj_info) == self.number_of_trajectories, "The number of trajectory infos/labels need " \ "to be equal to the number of trajectories." self._traj_info = traj_info self.traj_dt = traj_dt self.control_dt = control_dt # interpolation of the trajectories if self.traj_dt != control_dt: self._interpolate_trajectories(map_funct=interpolate_map, map_params=interpolate_map_params, re_map_funct=interpolate_remap, re_map_params=interpolate_remap_params) self.subtraj_step_no = 0 self.traj_no = 0 self.subtraj = self._get_subtraj(self.traj_no)
[docs] def create_dataset(self, ignore_keys=None, state_callback=None, state_callback_params=None): """ Creates a dataset used by imitation learning algorithms. Args: ignore_keys (list): List of keys to ignore in the dataset. state_callback (func): Function that should be called on each state. state_callback_params (dict): Dictionary of parameters needed to make the state transformation. Returns: Dictionary containing states, next_states, absorbing and last flags. For the states the shape is (N_traj x N_samples_per_traj-1, dim_state), while the flags have the shape (N_traj x N_samples_per_traj-1). If traj_info was specified, it will also include that. """ flat_traj = self.flattened_trajectories() # create a dict and extract all elements except the ones specified in ignore_keys. all_data = dict(zip(self.keys, deepcopy(list(flat_traj)))) if ignore_keys is not None: for ikey in ignore_keys: del all_data[ikey] traj = list(all_data.values()) # create states array shape=(n_states, dim_obs) states = np.concatenate(traj, axis=1) if state_callback is not None: transformed_states = [] for state in states: transformed_states.append(state_callback(state, **state_callback_params)) states = np.array(transformed_states) # convert to dict with states and next_states new_states = states[:-1] new_next_states = states[1:] absorbing = np.zeros(len(states[:-1])) # we assume that there are no absorbing states in the trajectory last = np.zeros(len(states)) last[self.split_points[1:]-1] = np.ones(len(self.split_points)-1) if self._traj_info is not None: info = np.array([[l] * self.trajectory_length for l in self._traj_info]).reshape(-1) return dict(states=new_states, next_states=new_next_states, absorbing=absorbing, last=last, info=info) else: return dict(states=new_states, next_states=new_next_states, absorbing=absorbing, last=last)
def _extract_trajectory_from_files(self): """ Extracts the trajectory from the trajectory files by filtering for the relevant keys. The trajectory is then split to multiple trajectories using the split points. Returns: A list of np.arrays. The length of the list is the number of observations. Each np.array has the shape (n_trajectories, n_samples, (dim_observation)). If dim_observation is one the shape of the array is just (n_trajectories, n_samples). """ # load data of relevant keys trajectories = [self._trajectory_files[key] for key in self.keys] # check that all observations have equal lengths len_obs = np.array([len(obs) for obs in trajectories]) assert np.all(len_obs == len_obs[0]), "Some observations have different lengths than others. " \ "Trajectory is corrupted. " # split trajectory into multiple trajectories using split points for i in range(len(trajectories)): trajectories[i] = np.split(trajectories[i], self.split_points[1:-1]) # check if all trajectories are of equal length len_trajectories = np.array([len(traj) for traj in trajectories[i]]) assert np.all(len_trajectories == len_trajectories[0]), "Only trajectories of equal length " \ "are currently supported." trajectories[i] = np.array(trajectories[i]) return trajectories def _interpolate_trajectories(self, map_funct, re_map_funct, map_params, re_map_params): """ Interpolates all trajectories cubically. Args: map_funct (func): Function used to map a trajectory to some space that allows interpolation. re_map_funct (func): Function used to map a transformed trajectory back to the original space after interpolation. map_params: Set of parameters needed to do the interpolation by the respective environment. re_map_params: Set of parameters needed to do the interpolation by the respective environment. """ assert (map_funct is None) == (re_map_funct is None) new_trajs = list() # interpolate over each trajectory for i in range(self.number_of_trajectories): traj = [obs[i] for obs in self.trajectories] x = np.arange(self.trajectory_length) new_traj_sampling_factor = self.traj_dt / self.control_dt x_new = np.linspace(0, self.trajectory_length - 1, round(self.trajectory_length * new_traj_sampling_factor), endpoint=True) # preprocess trajectory traj = map_funct(traj) if map_params is None else map_funct(traj, **map_params) new_traj = interpolate.interp1d(x, traj, kind="cubic", axis=1)(x_new) # postprocess trajectory new_traj = re_map_funct(new_traj) if re_map_params is None else re_map_funct(new_traj, **re_map_params) new_trajs.append(new_traj) # convert trajectory back to original shape trajectories = [] for i in range(self.number_obs_trajectory): trajectories.append([]) for traj in new_trajs: trajectories[i].append(traj[i]) trajectories[i] = np.array(trajectories[i]) self.trajectories = trajectories # interpolation of split_points self.split_points = [0] for k in range(self.number_of_trajectories): self.split_points.append(self.split_points[-1] + len(self.trajectories[0][k])) self.split_points = np.array(self.split_points)
[docs] def reset_trajectory(self, substep_no=None, traj_no=None): """ Resets the trajectory to a certain trajectory and a substep within that trajectory. If one of them is None, they are set randomly. Args: substep_no (int, None): Starting point of the trajectory. If None, the trajectory starts from a random point. traj_no (int, None): Number of the trajectory to start from. If None, it starts from a random trajectory Returns: The chosen (or randomly sampled) sample from a trajectory. """ if traj_no is None: self.traj_no = np.random.randint(0, self.number_of_trajectories) else: assert 0 <= traj_no <= self.number_of_trajectories self.traj_no = traj_no if substep_no is None: self.subtraj_step_no = np.random.randint(0, self.trajectory_length) else: assert 0 <= substep_no <= self.trajectory_length self.subtraj_step_no = substep_no # choose a sub trajectory self.subtraj = self._get_subtraj(self.traj_no) # reset x and y to middle position self.subtraj[0] -= self.subtraj[0][self.subtraj_step_no] self.subtraj[1] -= self.subtraj[1][self.subtraj_step_no] sample = [obs[self.subtraj_step_no] for obs in self.subtraj] return sample
[docs] def check_if_trajectory_is_in_range(self, low, high, keys, j_idx, warn, clip_trajectory_to_joint_ranges): if warn or clip_trajectory_to_joint_ranges: # get q_pos indices j_idx = j_idx[2:] # exclude x and y highs = dict(zip(keys[2:], high)) lows = dict(zip(keys[2:], low)) # check if they are in range for i, item in enumerate(self._trajectory_files.items()): k, d = item if i in j_idx and k in keys: if warn: clip_message = "Clipping the trajectory into range!" if clip_trajectory_to_joint_ranges else "" if np.max(d) > highs[k]: warnings.warn("Trajectory violates joint range in %s. Maximum in trajectory is %f " "and maximum range is %f. %s" % (k, np.max(d), highs[k], clip_message), RuntimeWarning) elif np.min(d) < lows[k]: warnings.warn("Trajectory violates joint range in %s. Minimum in trajectory is %f " "and minimum range is %f. %s" % (k, np.min(d), lows[k], clip_message), RuntimeWarning) # clip trajectory to min & max if clip_trajectory_to_joint_ranges: self._trajectory_files[k] = np.clip(self._trajectory_files[k], lows[k], highs[k])
[docs] def get_current_sample(self): """ Returns the current sample in the trajectory. """ return self._get_ith_sample_from_subtraj(self.subtraj_step_no)
[docs] def get_next_sample(self): """ Returns the next sample in the trajectory. """ self.subtraj_step_no += 1 if self.subtraj_step_no == self.trajectory_length: sample = None else: sample = self._get_ith_sample_from_subtraj(self.subtraj_step_no) return sample
[docs] def get_from_sample(self, sample, key): """ Returns the part of the sample whose key is specified. In contrast to the function _get_from_obs from the base environment, this function also allows to access information that is in the trajectory, but not in the simulation such as goal definitions. Note: This function is not suited for getting an observation from environment samples! Args: sample (list or np.array): Current sample to extract an observation from. key (string): Name of the observation to extract from sample Returns: np.array consisting of the observation specified by the key. """ assert len(sample) == len(self.keys) idx = self.get_idx(key) return sample[idx]
[docs] def get_idx(self, key): """ Returns the index of the key. Note: This function is not suited for getting the index for an observation of the environment! Args: key (string): Name of the observation to extract from sample Returns: int containing the desired index. """ return self.keys.index(key)
[docs] def flattened_trajectories(self): """ Returns the trajectories flattened in the N_traj dimension. Also expands dim if obs has dimension 1. """ trajectories = [] for obs in self.trajectories: if len(obs.shape) == 2: trajectories.append(obs.reshape((-1, 1))) elif len(obs.shape) == 3: trajectories.append(obs.reshape((-1, obs.shape[2]))) else: raise ValueError("Unsupported shape of observation %s." % obs.shape) return trajectories
def _get_subtraj(self, i): """ Returns a copy of the i-th trajectory included in trajectories. """ return [obs[i].copy() for obs in self.trajectories] def _get_ith_sample_from_subtraj(self, i): """ Returns a copy of the i-th sample included in the current subtraj. """ return [np.array(obs[i].copy()).flatten() for obs in self.subtraj] @property def number_obs_trajectory(self): """ Returns the number of observations in the trajectory. """ return len(self.trajectories) @property def trajectory_length(self): """ Returns the length of a trajectory. Note that all trajectories have to be equal in length. """ return self.trajectories[0].shape[1] @property def number_of_trajectories(self): """ Returns the number of trajectories. """ return self.trajectories[0].shape[0]