# -*- coding: utf-8 -*-
"""
AtropineEnv simulates an atropine production environment.
"""
import matplotlib.pyplot as plt
import numpy as np
from casadi import DM
from gym import spaces, Env
from .helpers.constants import USS, INPUT_REFS, OUTPUT_REF, SIM_TIME
from .helpers.helper_funcs import state_estimator
from .models.atropine_process import Plant
from .models.config import ND1, ND2, ND3, V1, V2, V3, V4, dt
from .utils import *
[docs]class AtropineEnvGym(QuarticGymEnvBase):
def __init__(self, normalize=True, max_steps: int = 60, x0_loc='quarticgym/datasets/atropineenv/x0.txt',
z0_loc='quarticgym/datasets/atropineenv/z0.txt', model_loc='quarticgym/datasets/atropineenv/model.npy',
uss_subtracted=False, reward_on_ess_subtracted=False, reward_on_steady=True,
reward_on_absolute_efactor=False, reward_on_actions_penalty=0.0, reward_on_reject_actions=True,
relaxed_max_min_actions=False, observation_include_t=True, observation_include_action=False,
observation_include_uss=True, observation_include_ess=True, observation_include_e=True,
observation_include_kf=True, observation_include_z=True, observation_include_x=False):
self.normalize = normalize
self.max_steps = max_steps # how many steps can this env run. if self.max_steps == -1 then run forever.
self.action_dim = 4
self.uss_subtracted = uss_subtracted # we assume that we can see the steady state output during steps. If true, we plus the actions with USS during steps.
self.reward_on_ess_subtracted = reward_on_ess_subtracted
self.reward_on_steady = reward_on_steady # whether reward base on Efactor (the small the better) or base on how close it is to the steady e-factor
self.reward_on_absolute_efactor = reward_on_absolute_efactor # whether reward base on absolute Efactor. (is a valid input only if reward_on_steady is False)
self.reward_on_actions_penalty = reward_on_actions_penalty
self.reward_on_reject_actions = reward_on_reject_actions # when input actions are larger than max_actions, reject it and end the env immediately.
self.relaxed_max_min_actions = relaxed_max_min_actions # assume uss_subtracted = false.
# now, select what to include during observations. by default we should have format like
# USS1, USS2, USS3, USS4, U1, U2, U3, U4, ESS, E, KF_X1, KF_X2, Z1, Z2, ..., Z30
self.observation_include_t = observation_include_t # 1
self.observation_include_action = observation_include_action # 4
self.observation_include_uss = observation_include_uss # 4
self.observation_include_ess = observation_include_ess # yss, 1
self.observation_include_e = observation_include_e # y, efactor, 1
self.observation_include_kf = observation_include_kf # after kalman filter, 2
self.observation_include_z = observation_include_z # 30
self.observation_include_x = observation_include_x # 1694
if type(x0_loc) is str:
self.x0 = np.loadtxt(x0_loc) # initial states [0,50]
elif type(x0_loc) is np.ndarray:
self.x0 = x0_loc
elif type(x0_loc) is list:
self.x0 = np.array(x0_loc)
else:
raise Exception("x0_loc must be a string, list or a numpy array")
if type(z0_loc) is str:
self.z0 = np.loadtxt(z0_loc) # initial states [0,50]
elif type(z0_loc) is np.ndarray:
self.z0 = z0_loc
elif type(z0_loc) is list:
self.z0 = np.array(z0_loc)
else:
raise Exception("z0_loc must be a string, list or a numpy array")
self.model_preconfig = np.load(model_loc, allow_pickle=True) # model
# for a fixed batch.
self.ur = INPUT_REFS # reference inputs
self.yr = OUTPUT_REF # reference output
self.num_sim = int(SIM_TIME / dt) # SIM_TIME/ 400 hours as fixed batch.
self.observation_dim = 1 * self.observation_include_t + 4 * self.observation_include_action + 4 * self.observation_include_uss + 1 * self.observation_include_ess + \
1 * self.observation_include_e + 2 * self.observation_include_kf + 30 * self.observation_include_z + 1694 * self.observation_include_x
max_observations = []
if self.observation_include_t:
max_observations.append(np.ones(1, dtype=np.float32) * 100.0) # by convention
if self.observation_include_action:
max_observations.append(np.ones(4, dtype=np.float32) * 1.0) # by convention
if self.observation_include_uss:
max_observations.append(np.ones(4, dtype=np.float32) * 0.5) # from dataset
if self.observation_include_ess:
max_observations.append(np.ones(1, dtype=np.float32) * 15.0) # from dataset
if self.observation_include_e:
max_observations.append(np.ones(1, dtype=np.float32) * 20.0) # from dataset
if self.observation_include_kf:
max_observations.append(np.ones(2, dtype=np.float32) * 0.05) # from dataset
if self.observation_include_z:
max_observations.append(np.ones(30, dtype=np.float32) * 0.5) # by convention
if self.observation_include_x:
max_observations.append(np.ones(1694, dtype=np.float32) * 50.0) # by convention
try:
self.max_observations = np.concatenate(max_observations)
except ValueError:
raise Exception("observations must contain something! Need at least one array to concatenate")
self.min_observations = np.zeros(self.observation_dim, dtype=np.float32)
if not self.uss_subtracted:
self.max_actions = np.array([0.408, 0.125, 0.392, 0.214], dtype=np.float32) # from dataset
self.min_actions = np.array([0.4075, 0.105, 0.387, 0.208], dtype=np.float32) # from dataset
if self.relaxed_max_min_actions:
self.max_actions = np.array([0.5, 0.2, 0.5, 0.4], dtype=np.float32) # from dataset
self.min_actions = np.array([0.3, 0.0, 0.2, 0.1], dtype=np.float32) # from dataset
else:
self.max_actions = np.array([1.92476206e-05, 1.22118426e-02, 1.82154982e-03, 3.59729230e-04],
dtype=np.float32) # from dataset
self.min_actions = np.array([-0.00015742, -0.00146234, -0.00021812, -0.00300454],
dtype=np.float32) # from dataset
if self.relaxed_max_min_actions:
self.max_actions = np.array([2.0e-05, 1.3e-02, 2.0e-03, 4.0e-04], dtype=np.float32) # from dataset
self.min_actions = np.array([-0.00016, -0.0015, -0.00022, -0.00301], dtype=np.float32) # from dataset
if self.normalize:
self.observation_space = spaces.Box(low=-1, high=1, shape=(self.observation_dim,))
self.action_space = spaces.Box(low=-1, high=1, shape=(self.action_dim,))
else:
self.observation_space = spaces.Box(low=self.min_observations, high=self.max_observations,
shape=(self.observation_dim,))
self.action_space = spaces.Box(low=self.min_actions, high=self.max_actions, shape=(self.action_dim,))
self.plant = Plant(ND1, ND2, ND3, V1, V2, V3, V4, dt)
self.yss = self.plant.calculate_Efactor(DM(self.z0)) # steady state output, e-factor
[docs] def reset(self):
self.U = [] # inputs
self.Y = []
self.zk = DM(self.z0) # 30
self.xk = self.plant.mix_and_get_initial_condition(self.x0, USS)[0] # 1694
self.t = 0
self.previous_efactor = self.yss # steady state output, e-factor, the small the better
observations = []
if self.observation_include_t:
observations.append(np.array([self.t], dtype=np.float32))
if self.observation_include_action:
observations.append(np.zeros(4, dtype=np.float32))
if self.observation_include_uss:
observations.append(USS)
if self.observation_include_ess:
observations.append(np.array([self.yss], dtype=np.float32))
if self.observation_include_e:
observations.append(np.array([self.previous_efactor], dtype=np.float32))
if self.observation_include_kf:
self.xe = np.zeros(2, dtype=np.float32)
observations.append(self.xe)
if self.observation_include_z:
observations.append(self.zk.full().flatten())
if self.observation_include_x:
observations.append(self.xk.full().flatten())
try:
observation = np.concatenate(observations)
except ValueError:
raise Exception("observations must contain something! Need at least one array to concatenate")
if self.normalize:
observation, _, _ = normalize_spaces(observation, self.max_observations, self.min_observations)
return observation
def _step(self, action):
if self.max_steps == -1:
done = False
else:
done = (self.t >= self.max_steps - 1)
action = np.array(action, dtype=np.float32)
if self.normalize:
action, _, _ = denormalize_spaces(action, self.max_actions, self.min_actions)
if self.reward_on_reject_actions:
if (action > self.max_actions).any() or (action < self.min_actions).any():
reward = -100000.0
done = True
observation = np.zeros(self.observation_dim, dtype=np.float32)
return observation, reward, done, {"efactor": 100000.0, "previous_efactor": self.previous_efactor,
"reward_on_steady": reward, "reward_on_absolute_efactor": reward,
"reward_on_efactor_diff": reward}
if self.uss_subtracted:
uk = [
action[0] + USS[0],
action[1] + USS[1],
action[2] + USS[2],
action[3] + USS[3]
]
else:
uk = action
self.U.append(uk)
_, xnext, znext = self.plant.simulate(self.xk, self.zk, uk)
efactor = self.plant.calculate_Efactor(znext)
self.Y.append(efactor)
reward_on_steady = -abs(efactor - self.yss)
reward_on_absolute_efactor = -abs(efactor)
reward_on_efactor_diff = self.previous_efactor - efactor
previous_efactor = self.previous_efactor
if self.reward_on_ess_subtracted:
reward = self.yss - efactor # efactor the smaller the better
elif self.reward_on_steady:
reward = reward_on_steady
else:
if self.reward_on_absolute_efactor:
reward = reward_on_absolute_efactor
else:
reward = reward_on_efactor_diff
reward += np.linalg.norm(action * self.reward_on_actions_penalty, ord=2)
self.previous_efactor = efactor
self.xk = xnext
self.zk = znext
observations = []
if self.observation_include_t:
observations.append(np.array([self.t], dtype=np.float32))
if self.observation_include_action:
observations.append(action)
if self.observation_include_uss:
observations.append(USS)
if self.observation_include_ess:
observations.append(np.array([self.yss], dtype=np.float32))
if self.observation_include_e:
observations.append(np.array([self.previous_efactor],
dtype=np.float32)) # !!!!!!!also, shall I run a step here? how do we align?
if self.observation_include_kf:
xe = state_estimator(
self.xe, uk, efactor - self.yss, # self.Xhat[k] is previous step xe
self.model_preconfig[0], self.model_preconfig[1],
self.model_preconfig[2], self.model_preconfig[4]
)
observations.append(xe)
if self.observation_include_z:
observations.append(self.zk.full().flatten())
if self.observation_include_x:
observations.append(self.xk.full().flatten())
try:
observation = np.concatenate(observations)
except ValueError:
raise Exception("observations must contain something! Need at least one array to concatenate")
if self.normalize:
observation, _, _ = normalize_spaces(observation, self.max_observations, self.min_observations)
self.t += 1
return observation, reward, done, {"efactor": efactor, "previous_efactor": previous_efactor,
"reward_on_steady": reward_on_steady,
"reward_on_absolute_efactor": reward_on_absolute_efactor,
"reward_on_efactor_diff": reward_on_efactor_diff}
# state, reward, done, info in gym env term
[docs] def step(self, action):
try:
return self._step(action)
except Exception:
reward = -100000.0
done = True
observation = np.zeros(self.observation_dim, dtype=np.float32)
return observation, reward, done, {"efactor": 100000.0, "previous_efactor": self.previous_efactor,
"reward_on_steady": reward, "reward_on_absolute_efactor": reward,
"reward_on_efactor_diff": reward}
[docs] def plot(self, show=False, efactor_fig_name=None, input_fig_name=None):
target_efactor = [self.yss + self.yr] * self.num_sim
target_inputs = [USS + self.ur] * self.num_sim
U = np.array(self.U) * 1000 # scale the solution to micro Litres
target_inputs = np.array(target_inputs) * 1000 # scale the solution to micro Litres
local_t = [k * dt for k in range(self.num_sim)]
# plots
plt.close("all")
plt.figure(0)
plt.plot(local_t, self.Y, label='Real Output')
plt.plot(local_t, target_efactor, linestyle="--", label='Steady State Output')
plt.xlabel('Time [min]')
plt.ylabel('E-Factor [A.U.]')
plt.legend()
plt.grid()
if efactor_fig_name is not None:
plt.savefig(efactor_fig_name)
plt.tight_layout()
# create figure (fig), and array of axes (ax)
fig, axs = plt.subplots(nrows=2, ncols=2)
axs[0, 0].step(local_t, U[:, 0], where='post', label='Real Input')
axs[0, 0].step(local_t, target_inputs[:, 0], where='post', linestyle="--", label='Steady State Input')
axs[0, 0].set_ylabel(u'U1 [\u03bcL/min]')
axs[0, 0].set_xlabel('time [min]')
axs[0, 0].grid()
axs[0, 1].step(local_t, U[:, 1], where='post', label='Real Input')
axs[0, 1].step(local_t, target_inputs[:, 1], where='post', linestyle="--", label='Steady State Input')
axs[0, 1].set_ylabel(u'U2 [\u03bcL/min]')
axs[0, 1].set_xlabel('time [min]')
axs[0, 1].grid()
axs[1, 0].step(local_t, U[:, 2], where='post', label='Real Input')
axs[1, 0].step(local_t, target_inputs[:, 2], where='post', linestyle="--", label='Steady State Input')
axs[1, 0].set_ylabel(u'U3 [\u03bcL/min]')
axs[1, 0].set_xlabel('time [min]')
axs[1, 0].grid()
axs[1, 1].step(local_t, U[:, 3], where='post', label='Real Input')
axs[1, 1].step(local_t, target_inputs[:, 3], where='post', linestyle="--", label='Steady State Input')
axs[1, 1].set_ylabel(u'U4 [\u03bcL/min]')
axs[1, 1].set_xlabel('time [min]')
axs[1, 1].legend()
plt.tight_layout()
plt.grid()
if input_fig_name is not None:
plt.savefig(input_fig_name)
if show:
plt.show()
else:
plt.close()