Source code for quarticgym.envs.utils

from torch.utils.data import Dataset
import json
import os
import random
# ---- to capture numpy warnings ----
import warnings

import matplotlib.pyplot as plt
import numpy as np
from gym import spaces, Env  # to create an openai-gym environment https://gym.openai.com/
from mzutils import SimplePriorityQueue, normalize_spaces, denormalize_spaces, mkdir_p
from scipy.integrate import solve_ivp  # the ode solver
from tqdm import tqdm

[docs]class TorchDatasetFromD4RL(Dataset): def __init__(self, dataset_d4rl) -> None: import d3rlpy """ dataset_d4rl should be a dictionary in the d4rl dataset format. """ self.dataset = d3rlpy.dataset.MDPDataset(dataset_d4rl['observations'], dataset_d4rl['actions'], dataset_d4rl['rewards'], dataset_d4rl['terminals']) def __len__(self): return self.dataset.__len__() def __getitem__(self, idx): episode = self.dataset.__getitem__(idx) return {'observations': episode.observations, 'actions': episode.actions, 'rewards': episode.rewards}
[docs]class QuarticGymEnvBase(Env): def __init__(self, dense_reward=True, normalize=True, debug_mode=False, action_dim=2, observation_dim=3, reward_function=None, done_calculator=None, max_observations=[1.0, 1.0], min_observations=[-1.0, -1.0], max_actions=[1.0, 1.0], min_actions=[-1.0, -1.0], error_reward=-100.0): """the __init__ of a QuarticGym environment. Args: dense_reward (bool, optional): Whether returns a dense reward or not. If True, will try to return a reward for each step. If False, will return a reward at the end of the episode. Defaults to True. normalize (bool, optional): Whether to normalize the actions taken and observations returned to a certain range. If True, the range $$\in R^n, [-1, 1]^n$$ where n is the dimension of an action/observation. Defaults to True. debug_mode (bool, optional): Whether to return or print extra information. Defaults to False. action_dim (int, optional): Dimensionality of an action. Defaults to 2. observation_dim (int, optional): Dimensionality of an observation. Defaults to 3. reward_function ([type], optional): Provide to replace with your custom reward function. When not given, use environments' default reward function. Defaults to None. done_calculator ([type], optional): Provide to replace with your custom "episode end here calculator". When not given, use environments' default done calculator. Defaults to None. max_observations (list, optional): Defaults to [1.0, 1.0]. min_observations (list, optional): Defaults to [-1.0, -1.0]. max_actions (list, optional): Defaults to [1.0, 1.0]. min_actions (list, optional): Defaults to [-1.0, -1.0]. error_reward (float, optional): When an error is encountered during an episode (this typically means something really bad, like tank overflow). Defaults to -100.0. """ # define arguments self.step_count = 0 self.total_reward = 0 self.done = False self.dense_reward = dense_reward self.normalize = normalize # whether we want to normalize the observation and action to be in between -1 and 1. This is common in most of RL algorithms self.debug_mode = debug_mode # to print debug information. self.action_dim = action_dim self.observation_dim = observation_dim self.reward_function = reward_function # if not satisfied with in-house reward function, you can use your own self.done_calculator = done_calculator # if not satisfied with in-house finish calculator, you can use your own self.max_observations = max_observations self.min_observations = min_observations self.max_actions = max_actions self.min_actions = min_actions self.error_reward = error_reward if self.reward_function is None: self.reward_function = self.reward_function_standard if self.done_calculator is None: self.done_calculator = self.done_calculator_standard # define the state and action spaces self.max_observations = np.array(self.max_observations, dtype=self.np_dtype) self.min_observations = np.array(self.min_observations, dtype=self.np_dtype) self.max_actions = np.array(self.max_actions, dtype=self.np_dtype) self.min_actions = np.array(self.min_actions, dtype=self.np_dtype) if self.normalize: self.observation_space = spaces.Box(low=-1, high=1, shape=(self.observation_dim,)) self.action_space = spaces.Box(low=-1, high=1, shape=(self.action_dim,)) else: self.observation_space = spaces.Box(low=self.min_observations, high=self.max_observations, shape=(self.observation_dim,)) self.action_space = spaces.Box(low=self.min_actions, high=self.max_actions, shape=(self.action_dim,))
[docs] def observation_beyond_box(self, observation): """ check if the observation is beyond the box, which is what we don't want. """ return np.any(observation > self.max_observations) or np.any(observation < self.min_observations) or np.any( np.isnan(observation)) or np.any(np.isinf(observation))
[docs] def reward_function_standard(self, previous_observation, action, current_observation, reward=None): # s, a, r, s, a if reward is not None: return reward elif self.observation_beyond_box(current_observation): return self.error_reward # TOMODIFY: insert your own reward function here. reward = max(self.error_reward, reward) # reward cannot be smaller than the error_reward if self.debug_mode: print("reward:", reward) return reward
[docs] def done_calculator_standard(self, current_observation, step_count, reward, done=None, done_info=None): """ check whether the current episode is considered finished. returns a boolean value indicated done or not, and a dictionary with information. here in done_calculator_standard, done_info looks like {"terminal": boolean, "timeout": boolean}, where "timeout" is true when episode end due to reaching the maximum episode length, "terminal" is true when "timeout" or episode end due to termination conditions such as env error encountered. (basically done) """ if done is None: done = False else: if done_info is not None: return done, done_info else: raise Exception("When done is given, done_info should also be given.") if done_info is None: done_info = {"terminal": False, "timeout": False} else: if done_info["terminal"] or done_info["timeout"]: done = True return done, done_info if self.observation_beyond_box(current_observation): done_info["terminal"] = True done = True if reward == self.error_reward: done_info["terminal"] = True done = True if step_count >= self.max_steps: # same as range(0, max_steps) done_info["terminal"] = True done_info["timeout"] = True done = True return done, done_info
[docs] def reset(self, initial_state=None, normalize=None): """ required by gym. This function resets the environment and returns an initial observation. """ self.step_count = 0 self.total_reward = 0 self.done = False if initial_state is not None: initial_state = np.array(initial_state, dtype=self.np_dtype) observation = initial_state self.init_observation = initial_state else: observation = self.sample_initial_state() self.init_observation = observation self.previous_observation = observation # TOMODIFY: reset your environment here. normalize = self.normalize if normalize is None else normalize if normalize: observation, _, _ = normalize_spaces(observation, self.max_observations, self.min_observations) return observation
[docs] def step(self, action, normalize=None): """ required by gym. This function performs one step within the environment and returns the observation, the reward, whether the episode is finished and debug information, if any. """ if self.debug_mode: print("action:", action) reward = None done = None done_info = {"terminal": False, "timeout": False} action = np.array(action, dtype=self.np_dtype) normalize = self.normalize if normalize is None else normalize if normalize: action, _, _ = denormalize_spaces(action, self.max_actions, self.min_actions) # TOMODIFY: proceed your environment here and collect the observation. observation = [0.0, 0.0] # compute reward if not reward: reward = self.reward_function(self.previous_observation, action, observation, reward=reward) # compute done if not done: done, done_info = self.done_calculator(observation, self.step_count, reward, done=done, done_info=done_info) self.previous_observation = observation self.total_reward += reward if self.dense_reward: reward = reward # conventional elif not done: reward = 0.0 else: reward = self.total_reward # clip observation so that it won't be beyond the box observation = observation.clip(self.min_observations, self.max_observations) if normalize: observation, _, _ = normalize_spaces(observation, self.max_observations, self.min_observations) self.step_count += 1 info = {} info.update(done_info) return observation, reward, done, info
[docs] def set_initial_states(self, initial_states, num_episodes): if initial_states is None: initial_states = [self.sample_initial_state() for _ in range(num_episodes)] elif isinstance(initial_states, str): initial_states = np.load(initial_states) assert len(initial_states) == num_episodes return initial_states
[docs] def evalute_algorithms(self, algorithms, num_episodes=1, error_reward=-1000.0, initial_states=None, to_plt=True, plot_dir='./plt_results'): """ when excecuting evalute_algorithms, the self.normalize should be False. algorithms: list of (algorithm, algorithm_name, normalize). algorithm has to have a method predict(observation) -> action: np.ndarray. num_episodes: number of episodes to run error_reward: initial_states: None, location of numpy file of initial states or a (numpy) list of initial states to_plt: whether generates plot or not plot_dir: None or directory to save plots returns: list of average_rewards over each episode and num of episodes """ try: assert self.normalize is False except AssertionError: print("env.normalize should be False when executing evalute_algorithms") self.normalize = False self.error_reward = error_reward if plot_dir is not None: mkdir_p(plot_dir) initial_states = self.set_initial_states(initial_states, num_episodes) observations_list = [[] for _ in range( len(algorithms))] # observations_list[i][j][t][k] is algorithm_i_game_j_observation_t_element_k actions_list = [[] for _ in range(len(algorithms))] # actions_list[i][j][t][k] is algorithm_i_game_j_action_t_element_k rewards_list = [[] for _ in range(len(algorithms))] # rewards_list[i][j][t] is algorithm_i_game_j_reward_t for n_epi in tqdm(range(num_episodes)): for n_algo in range(len(algorithms)): algo, algo_name, normalize = algorithms[n_algo] algo_observes = [] algo_actions = [] algo_rewards = [] # list, for this algorithm, reawards of this trajectory. init_obs = self.reset(initial_state=initial_states[n_epi]) # algo_observes.append(init_obs) o = init_obs done = False while not done: if normalize: o, _, _ = normalize_spaces(o, self.max_observations, self.min_observations) a = algo.predict(o) if normalize: a, _, _ = denormalize_spaces(a, self.max_actions, self.min_actions) algo_actions.append(a) o, r, done, _ = self.step(a) algo_observes.append(o) algo_rewards.append(r) observations_list[n_algo].append(algo_observes) actions_list[n_algo].append(algo_actions) rewards_list[n_algo].append(algo_rewards) if to_plt: # plot observations for n_o in range(self.observation_dim): o_name = self.observation_name[n_o] plt.close("all") plt.figure(0) plt.title(f"{o_name}") for n_algo in range(len(algorithms)): alpha = 1 * (0.7 ** (len(algorithms) - 1 - n_algo)) _, algo_name, _ = algorithms[n_algo] plt.plot(np.array(observations_list[n_algo][-1])[:, n_o], label=algo_name, alpha=alpha) plt.plot([initial_states[n_epi][n_o] for _ in range(self.max_steps)], linestyle="--", label=f"initial_{o_name}") plt.xticks(np.arange(1, self.max_steps + 2, 1)) plt.annotate(str(initial_states[n_epi][n_o]), xy=(0, initial_states[n_epi][n_o])) plt.legend() if plot_dir is not None: path_name = os.path.join(plot_dir, f"{n_epi}_observation_{o_name}.png") plt.savefig(path_name) plt.close() # plot actions for n_a in range(self.action_dim): a_name = self.action_name[n_a] plt.close("all") plt.figure(0) plt.title(f"{a_name}") for n_algo in range(len(algorithms)): alpha = 1 * (0.7 ** (len(algorithms) - 1 - n_algo)) _, algo_name, _ = algorithms[n_algo] plt.plot(np.array(actions_list[n_algo][-1])[:, n_a], label=algo_name, alpha=alpha) plt.xticks(np.arange(1, self.max_steps + 2, 1)) plt.legend() if plot_dir is not None: path_name = os.path.join(plot_dir, f"{n_epi}_action_{a_name}.png") plt.savefig(path_name) plt.close() # plot rewards plt.close("all") plt.figure(0) plt.title("reward") for n_algo in range(len(algorithms)): alpha = 1 * (0.7 ** (len(algorithms) - 1 - n_algo)) _, algo_name, _ = algorithms[n_algo] plt.plot(np.array(rewards_list[n_algo][-1]), label=algo_name, alpha=alpha) plt.xticks(np.arange(1, self.max_steps + 2, 1)) plt.legend() if plot_dir is not None: path_name = os.path.join(plot_dir, f"{n_epi}_reward.png") plt.savefig(path_name) plt.close() observations_list = np.array(observations_list) actions_list = np.array(actions_list) rewards_list = np.array(rewards_list) return observations_list, actions_list, rewards_list
[docs] def generate_dataset_with_algorithm(self, algorithm, normalize=None, num_episodes=1, error_reward=-1000.0, initial_states=None, format='d4rl'): """ this function aims to create a dataset for offline reinforcement learning, in either d4rl or pytorch format. the trajectories are generated by the algorithm, which interacts with this env initialized by initial_states. algorithm: an instance that has a method predict(observation) -> action: np.ndarray. if format == 'd4rl', returns a dictionary in d4rl format. else if format == 'torch', returns an object of type torch.utils.data.Dataset. """ if normalize is None: normalize = self.normalize initial_states = self.set_initial_states(initial_states, num_episodes) dataset = {} dataset["observations"] = [] dataset["actions"] = [] dataset["rewards"] = [] dataset["terminals"] = [] dataset["timeouts"] = [] for n_epi in tqdm(range(num_episodes)): o = self.reset(initial_state=initial_states[n_epi]) r = 0.0 done = False timeout = False final_done = False # to still record for the last t when done while not final_done: if done: final_done = True # tmp_o is to be normalized, if normalize is true. tmp_o = o if normalize: tmp_o, _, _ = normalize_spaces(tmp_o, self.max_observations, self.min_observations) a = algorithm.predict(tmp_o) if normalize: a, _, _ = denormalize_spaces(a, self.max_actions, self.min_actions) dataset['observations'].append(o) dataset['actions'].append(a) dataset['rewards'].append(r) dataset['terminals'].append(done) dataset["timeouts"].append(timeout) o, r, done, info = self.step(a) timeout = info['timeout'] dataset["observations"] = np.array(dataset["observations"]) dataset["actions"] = np.array(dataset["actions"]) dataset["rewards"] = np.array(dataset["rewards"]) dataset["terminals"] = np.array(dataset["terminals"]) dataset["timeouts"] = np.array(dataset["timeouts"]) if format == 'd4rl': return dataset elif format == 'torch': return TorchDatasetFromD4RL(dataset) else: raise ValueError(f"format {format} is not supported.")