Having covered the theory, we now discuss practical implementation using the gymnasium library (formerly OpenAI Gym). Gymnasium provides standard interfaces for reinforcement learning environments, facilitating experimentation with various RL algorithms.
To use Gymnasium environments:
import gymnasium as gym
env = gym.make('<Environment-Name>')
state, info = env.reset()
The environment can be stepped through using:
next_state, reward, terminated, truncated, info = env.step(action)
The Pendulum-v1 environment is a classic control task:
- State: `[\cos\theta, \sin\theta, \dot{\theta}]`
- Action: Continuous torque between `[-2, +2]` N·m
- Reward: `-(\Delta \theta^2 + 0.1*\dot{\theta}^2 + 0.001*\tau^2)`
The gymnasium environment can also be used for demonstration exercises such as using hand-tracking to adjust the torque on the cart.
Actor and Critic Networks (PyTorch Example):
import torch
import torch.nn as nn
class Actor(nn.Module):
def __init__(self, state_dim, action_dim, max_action):
super().__init__()
self.max_action = max_action
self.net = nn.Sequential(
nn.Linear(state_dim, 128), nn.ReLU(),
nn.Linear(128, 128), nn.ReLU(),
nn.Linear(128, action_dim)
)
def forward(self, state):
return self.max_action * torch.tanh(self.net(state))
class Critic(nn.Module):
def __init__(self, state_dim, action_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim + action_dim, 128), nn.ReLU(),
nn.Linear(128, 128), nn.ReLU(),
nn.Linear(128, 1)
)
def forward(self, state, action):
return self.net(torch.cat([state, action], dim=1))
Training Steps Outline:
- Initialize environment and replay buffer.
- Collect experiences and perform network updates:
- Critic loss: minimize squared difference from target Q.
- Actor loss: maximize critic Q-values.
- Use soft updates for target networks.
Gymnasium Environments are custom RL setups:
- State: Concentration, temperature.
- Action: Coolant flow rate or other continuous variables.
- Reward: Based on optimization goals (e.g., yield, safety constraints).
Implement by subclassing Gymnasium:
import gymnasium as gym
class CSTREnv(gym.Env):
def __init__(self):
self.observation_space = gym.spaces.Box(low=[0,0], high=[10,500])
self.action_space = gym.spaces.Box(low=[0], high=[1])
# Define reactor dynamics and initial conditions
def reset(self):
# Initialize state
return state, {}
def step(self, action):
# Update state using reactor dynamics equations
# Calculate reward and done status
return next_state, reward, terminated, truncated, {}
Pendulum Problem with DDPG: The Pendulum-v1 environment is a classic control problem: the agent applies a torque to a pendulum (which starts hanging downward) and tries to swing it up and balance it upright. The state is 3-dimensional (angle represented as cos and sin, and angular velocity) and the action is 1-dimensional continuous (torque -2 to +2 N·m). The reward is highest (less negative) when the pendulum is upright and not moving (and using minimal torque).
- Implement a simplified DDPG for Pendulum.
- Build the actor and critic networks, a replay buffer, and the training loop. We’ll use PyTorch in code (though one could use TensorFlow similarly). The code will highlight how to connect to the gymnasium environment and perform learning.
- In practice, use an existing library like Stable Baselines3.
Set up the Pendulum Environment and Networks:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import gymnasium as gym
# Make Pendulum environment
env = gym.make('Pendulum-v1', g=9.81) # g is gravitational acceleration, default 10
state_dim = env.observation_space.shape[0] # dimension of state (should be 3 for Pendulum)
action_dim = env.action_space.shape[0] # dimension of action (1 for Pendulum)
max_action = float(env.action_space.high[0]) # max torque (=2.0)
Import the necessary libraries. Retrieve state_dim, action_dim, and max_action (to help scale outputs). Next, define the neural network models for actor and critic. A simple feedforward network is sufficient for this small problem:
# Actor Network: maps state -> action (within [-max_action, max_action])
class Actor(nn.Module):
def __init__(self, state_dim, action_dim, max_action):
super(Actor, self).__init__()
self.max_action = max_action
# Simple 2-layer MLP
self.net = nn.Sequential(
nn.Linear(state_dim, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, action_dim)
)
def forward(self, state):
# Output raw action, then scale to range [-max_action, max_action] using tanh
raw_action = self.net(state)
# bound output action between -1 and 1 via tanh, then scale
action = self.max_action * torch.tanh(raw_action)
return action
# Critic Network: maps (state, action) -> Q-value
class Critic(nn.Module):
def __init__(self, state_dim, action_dim):
super(Critic, self).__init__()
# Q-network takes state and action concatenated
self.net = nn.Sequential(
nn.Linear(state_dim + action_dim, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, 1)
)
def forward(self, state, action):
# Ensure state and action are concatenated as vectors
if action.dim() == 1:
action = action.unsqueeze(1)
x = torch.cat([state, action], dim=1)
Q = self.net(x)
return Q
The actor outputs a continuous action using a tanh activation to ensure it stays within [-1,1], then multiplies by max_action to scale to [-2,2]. The critic concatenates state and action and outputs a scalar Q-value. Use two hidden layers of 128 units as a reasonable size for this small problem.
Replay Buffer: DDPG relies on a replay buffer to store transitions and sample them for training. Implement a simple ring-buffer:
# Replay Buffer for experience replay
class ReplayBuffer:
def __init__(self, capacity=100000):
self.capacity = capacity
self.buffer = []
self.pos = 0 # position to insert next entry (for circular buffer)
def add(self, state, action, reward, next_state, done):
if len(self.buffer) < self.capacity:
self.buffer.append(None)
self.buffer[self.pos] = (state, action, reward, next_state, done)
# Move position pointer (overwrite oldest if full)
self.pos = (self.pos + 1) % self.capacity
def sample(self, batch_size):
batch = np.random.choice(len(self.buffer), batch_size, replace=False)
states, actions, rewards, next_states, dones = zip(*(self.buffer[i] for i in batch))
# Convert to torch tensors
return (torch.tensor(np.array(states), dtype=torch.float32),
torch.tensor(np.array(actions), dtype=torch.float32),
torch.tensor(np.array(rewards), dtype=torch.float32).unsqueeze(1),
torch.tensor(np.array(next_states), dtype=torch.float32),
torch.tensor(np.array(dones), dtype=torch.float32).unsqueeze(1))
def __len__(self):
return len(self.buffer)
The buffer stores transitions and samples a random batch for training as tensors.
Initialize DDPG components:
# Initialize actor, critic, target networks and optimizers
actor = Actor(state_dim, action_dim, max_action)
critic = Critic(state_dim, action_dim)
target_actor = Actor(state_dim, action_dim, max_action)
target_critic = Critic(state_dim, action_dim)
# Copy weights from actor to target_actor, and critic to target_critic
target_actor.load_state_dict(actor.state_dict())
target_critic.load_state_dict(critic.state_dict())
target_actor.eval()
target_critic.eval()
actor_optimizer = optim.Adam(actor.parameters(), lr=1e-3)
critic_optimizer = optim.Adam(critic.parameters(), lr=1e-3)
buffer = ReplayBuffer(capacity=100000)
Create the networks and set target networks initially equal to the main networks. Use Adam optimizers for both actor and critic. Create the replay buffer. The learning rates (1e-3) are chosen as typical starting points.
Training Loop: Train the DDPG agent. Here is an outline of the loop and key steps:
import math
num_episodes = 200 # number of episodes to train
batch_size = 64 # batch size for sampling from replay
gamma = 0.99 # discount factor
tau = 0.005 # target network update rate (tau)
exploration_noise = 0.1 # stddev for Gaussian exploration noise
for episode in range(num_episodes):
state, _ = env.reset()
state = state.astype(np.float32)
episode_reward = 0.0
for step in range(500): # max steps per episode (Pendulum typically truncated at 200)
# Select action according to current policy + exploration noise
state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
with torch.no_grad():
action = actor(state_tensor).cpu().numpy()[0]
# Add exploration noise (Gaussian)
action = action + np.random.normal(0, exploration_noise * max_action, size=action_dim)
action = np.clip(action, -max_action, max_action)
next_state, reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
next_state = next_state.astype(np.float32)
# Store transition in replay buffer
buffer.add(state, action, reward, next_state, done)
state = next_state
episode_reward += reward
# Train the networks if we have enough samples in replay buffer
if len(buffer) >= batch_size:
# Sample a batch
states, actions, rewards, next_states, dones = buffer.sample(batch_size)
# Compute target Q values using target networks
with torch.no_grad():
# Target actor for next action
next_actions = target_actor(next_states)
target_Q = target_critic(next_states, next_actions)
# If done (terminal), no future reward; use (1-done) mask
target_Q = rewards + gamma * (1 - dones) * target_Q
# Critic loss = MSE between current Q and target Q
current_Q = critic(states, actions)
critic_loss = nn.MSELoss()(current_Q, target_Q)
# Update critic
critic_optimizer.zero_grad()
critic_loss.backward()
critic_optimizer.step()
# Actor loss = -mean(Q) (because we want to maximize Q, so minimize -Q)
actor_actions = actor(states)
actor_loss = -critic(states, actor_actions).mean()
# Update actor
actor_optimizer.zero_grad()
actor_loss.backward()
actor_optimizer.step()
# Soft update target networks
for param, target_param in zip(critic.parameters(), target_critic.parameters()):
target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
for param, target_param in zip(actor.parameters(), target_actor.parameters()):
target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
if done:
break # episode ends
# Logging (print) the cumulative reward of the episode
print(f"Episode {episode+1}: Reward = {episode_reward:.2f}")
A few notes on the code above:
- Loop over episodes. For each episode, reset the environment and then loop for a certain number of steps (use 500 as an upper bound to ensure it doesn't loop forever if the environment doesn’t terminate early; Pendulum naturally truncates around 200 steps by default).
- Action selection: Get the action from the actor network actor(state_tensor) and then add Gaussian noise for exploration. The noise scale decays by exploration_noise * max_action (so here 0.1 * 2.0 = 0.2 std dev initial noise). This encourages exploration of different torques. Clip the action to the allowed range.
- Storing transitions: Store state, action, reward, next_state, done in the ReplayBuffer.
- Training updates: Wait until at least one batch worth of samples is in the buffer, then each time step do one gradient update for actor and critic:
- Compute target Q: using the target networks, compute `y_i = r_i + \gamma (1-d_i)\,Q_{\text{target}}(s_{i+1}, \mu_{\text{target}}(s_{i+1}))`. dones is a 0/1 indicator for terminal states; use (1 - done) to zero-out the future term if the episode ended.
- Critic loss: mean squared error between current_Q = critic(s,a) and target_Q. This corresponds to minimizing `(Q(s,a) - y)^2`.
- Actor loss: maximize the critic’s Q, so minimize -critic(s, actor(s)) (negative sign for gradient descent). This implements the deterministic policy gradient update.
- Target network update: perform a soft update with factor `\tau`. A small `\tau` of 0.005 means the target networks change very slowly, providing a stable reference for targets.
- Print episode rewards for monitoring. Over training, we expect the episode reward (which is negative in Pendulum, but closer to 0 is better since 0 would be perfect upright balance) to increase (become less negative), indicating the pendulum is kept more upright.
This code trains a DDPG agent on the Pendulum problem. With 200 episodes, it should achieve a decent policy (though possibly not perfect; longer training yields better results). In practice, one might incorporate learning rate schedules, noise decay, or more episodes.
Gymnasium Examples for Engineering
- CartPole: Balancing pole, discrete actions, robotics/control theory.
- MountainCar: Illustrates energy management.
- Acrobot: Robotic arm analogy.
- LunarLander: Aerospace engineering control.
- BipedalWalker: Biomechanical control using continuous actions.
- HVAC or Water Resource Management: Custom environments applicable to industrial optimization.
Gymnasium provides a test environment to implement RL algorithms across engineering domains.