Tutorial - PPO from Scratch

Goal

After this tutorial, you can implement and train PPO on any Gymnasium environment. You will write ~150 lines of real PPO implementation in PyTorch and train it on CartPole and LunarLander.

Prerequisites: Actor-Critic and PPO, RL Fundamentals, PyTorch basics.

Time: 90-120 minutes.

Step 1: Set up the environment

import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
from torch.distributions import Categorical
import matplotlib.pyplot as plt
 
# Start with CartPole -- discrete actions, simple dynamics
env = gym.make("CartPole-v1")
obs_dim = env.observation_space.shape[0]   # 4
n_actions = env.action_space.n              # 2
print(f"Obs dim: {obs_dim}, Actions: {n_actions}")
 
# Test the environment
obs, info = env.reset()
print(f"Initial observation: {obs}")
obs, reward, term, trunc, info = env.step(0)
print(f"After action 0: obs={obs}, reward={reward}")

Step 2: Build actor and critic networks

The actor outputs action probabilities. The critic outputs a scalar value estimate.

class ActorCritic(nn.Module):
    """Shared backbone with separate actor/critic heads."""
 
    def __init__(self, obs_dim, n_actions, hidden=64):
        super().__init__()
        # Shared feature extractor
        self.shared = nn.Sequential(
            nn.Linear(obs_dim, hidden),
            nn.Tanh(),
            nn.Linear(hidden, hidden),
            nn.Tanh(),
        )
        # Actor head: action probabilities
        self.actor = nn.Linear(hidden, n_actions)
        # Critic head: state value
        self.critic = nn.Linear(hidden, 1)
 
    def forward(self, obs):
        features = self.shared(obs)
        logits = self.actor(features)
        value = self.critic(features)
        return logits, value.squeeze(-1)
 
    def get_action(self, obs):
        """Sample action, return action, log_prob, value."""
        logits, value = self.forward(obs)
        dist = Categorical(logits=logits)
        action = dist.sample()
        return action.item(), dist.log_prob(action), value
 
    def evaluate(self, obs, actions):
        """Evaluate given actions (for PPO update).
        Returns: log_probs, values, entropy.
        """
        logits, values = self.forward(obs)
        dist = Categorical(logits=logits)
        log_probs = dist.log_prob(actions)
        entropy = dist.entropy()
        return log_probs, values, entropy
 
model = ActorCritic(obs_dim, n_actions)
print(f"Parameters: {sum(p.numel() for p in model.parameters()):,}")

What just happened: We built a neural network with two heads. The shared layers learn features useful for both action selection and value estimation. Tanh activations are common in RL (bounded outputs, smooth gradients). The actor outputs logits (unnormalized log-probabilities), which Categorical converts to a proper distribution.

Step 3: Implement rollout collection

Collect experience by running the policy in the environment.

class RolloutBuffer:
    """Store one rollout of experience."""
 
    def __init__(self):
        self.obs = []
        self.actions = []
        self.log_probs = []
        self.rewards = []
        self.values = []
        self.dones = []
 
    def store(self, obs, action, log_prob, reward, value, done):
        self.obs.append(obs)
        self.actions.append(action)
        self.log_probs.append(log_prob)
        self.rewards.append(reward)
        self.values.append(value)
        self.dones.append(done)
 
    def clear(self):
        self.__init__()
 
    def to_tensors(self):
        return {
            "obs": torch.FloatTensor(np.array(self.obs)),
            "actions": torch.LongTensor(self.actions),
            "log_probs": torch.stack(self.log_probs).detach(),
            "rewards": np.array(self.rewards, dtype=np.float32),
            "values": torch.stack(self.values).detach().numpy(),
            "dones": np.array(self.dones, dtype=np.float32),
        }
 
def collect_rollout(env, model, n_steps=2048):
    """Collect n_steps of experience."""
    buffer = RolloutBuffer()
    obs, _ = env.reset()
    episode_rewards = []
    current_ep_reward = 0
 
    for _ in range(n_steps):
        obs_tensor = torch.FloatTensor(obs)
        with torch.no_grad():
            action, log_prob, value = model.get_action(obs_tensor)
 
        next_obs, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
 
        buffer.store(obs, action, log_prob, reward, value, float(done))
        current_ep_reward += reward
 
        if done:
            episode_rewards.append(current_ep_reward)
            current_ep_reward = 0
            obs, _ = env.reset()
        else:
            obs = next_obs
 
    # Bootstrap value for last state
    with torch.no_grad():
        _, last_value = model(torch.FloatTensor(obs))
        last_value = last_value.item()
 
    return buffer, last_value, episode_rewards

What just happened: We ran the policy for 2048 steps, storing every (observation, action, log_prob, reward, value, done) tuple. This is the “rollout phase.” The last_value is needed to bootstrap the GAE computation for the final partial episode.

Step 4: Compute GAE advantages

def compute_gae(rewards, values, dones, last_value, gamma=0.99, lam=0.95):
    """Generalized Advantage Estimation.
    Returns: advantages, returns (both as numpy arrays).
    """
    n_steps = len(rewards)
    advantages = np.zeros(n_steps, dtype=np.float32)
    last_gae = 0.0
 
    # Append bootstrapped value
    values_ext = np.append(values, last_value)
 
    for t in reversed(range(n_steps)):
        next_non_terminal = 1.0 - dones[t]
        delta = rewards[t] + gamma * values_ext[t + 1] * next_non_terminal - values_ext[t]
        advantages[t] = delta + gamma * lam * next_non_terminal * last_gae
        last_gae = advantages[t]
 
    returns = advantages + values
    return advantages, returns

Step 5: Implement PPO loss

The core of PPO: clipped surrogate objective + value loss + entropy bonus.

def ppo_update(model, optimizer, buffer_tensors, advantages, returns,
               clip_eps=0.2, vf_coef=0.5, ent_coef=0.01, n_epochs=4,
               batch_size=64, max_grad_norm=0.5):
    """PPO update with minibatch SGD."""
    obs = buffer_tensors["obs"]
    actions = buffer_tensors["actions"]
    old_log_probs = buffer_tensors["log_probs"]
 
    advantages_t = torch.FloatTensor(advantages)
    returns_t = torch.FloatTensor(returns)
 
    # Normalize advantages (crucial for stability)
    advantages_t = (advantages_t - advantages_t.mean()) / (advantages_t.std() + 1e-8)
 
    n_samples = len(obs)
    total_policy_loss = 0
    total_value_loss = 0
    total_entropy = 0
    n_updates = 0
 
    for epoch in range(n_epochs):
        # Random minibatch indices
        indices = np.random.permutation(n_samples)
 
        for start in range(0, n_samples, batch_size):
            end = start + batch_size
            idx = indices[start:end]
 
            # Get current model outputs for this batch
            new_log_probs, values, entropy = model.evaluate(
                obs[idx], actions[idx]
            )
 
            # Policy loss (clipped surrogate)
            ratio = torch.exp(new_log_probs - old_log_probs[idx])
            surr1 = ratio * advantages_t[idx]
            surr2 = torch.clamp(ratio, 1.0 - clip_eps, 1.0 + clip_eps) * advantages_t[idx]
            policy_loss = -torch.min(surr1, surr2).mean()
 
            # Value loss
            value_loss = nn.functional.mse_loss(values, returns_t[idx])
 
            # Entropy bonus
            entropy_loss = -entropy.mean()
 
            # Total loss
            loss = policy_loss + vf_coef * value_loss + ent_coef * entropy_loss
 
            optimizer.zero_grad()
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
            optimizer.step()
 
            total_policy_loss += policy_loss.item()
            total_value_loss += value_loss.item()
            total_entropy += entropy.mean().item()
            n_updates += 1
 
    return {
        "policy_loss": total_policy_loss / n_updates,
        "value_loss": total_value_loss / n_updates,
        "entropy": total_entropy / n_updates,
    }

What just happened: For each epoch, we shuffle the rollout data into minibatches. For each minibatch, we compute the PPO clipped loss, value loss, and entropy bonus. The advantage normalization is crucial — without it, the scale of advantages varies wildly between rollouts and learning is unstable.

Step 6: Training loop

def train_ppo(env_name="CartPole-v1", total_timesteps=200_000,
              n_steps=2048, n_epochs=4, batch_size=64, lr=3e-4,
              gamma=0.99, gae_lambda=0.95, clip_eps=0.2):
    """Full PPO training loop."""
    env = gym.make(env_name)
    obs_dim = env.observation_space.shape[0]
    n_actions = env.action_space.n
 
    model = ActorCritic(obs_dim, n_actions)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, eps=1e-5)
 
    all_rewards = []
    n_updates_done = 0
    timesteps_done = 0
 
    while timesteps_done < total_timesteps:
        # Collect rollout
        buffer, last_value, ep_rewards = collect_rollout(env, model, n_steps)
        timesteps_done += n_steps
        all_rewards.extend(ep_rewards)
 
        # Compute advantages
        data = buffer.to_tensors()
        advantages, returns = compute_gae(
            data["rewards"], data["values"], data["dones"],
            last_value, gamma, gae_lambda
        )
 
        # PPO update
        losses = ppo_update(
            model, optimizer, data, advantages, returns,
            clip_eps=clip_eps, n_epochs=n_epochs, batch_size=batch_size
        )
 
        n_updates_done += 1
        if n_updates_done % 5 == 0 and len(all_rewards) > 0:
            recent = all_rewards[-20:] if len(all_rewards) >= 20 else all_rewards
            print(f"Update {n_updates_done} | "
                  f"Timesteps: {timesteps_done:,} | "
                  f"Mean reward (last 20 eps): {np.mean(recent):.1f} | "
                  f"Policy loss: {losses['policy_loss']:.4f} | "
                  f"Value loss: {losses['value_loss']:.4f} | "
                  f"Entropy: {losses['entropy']:.4f}")
 
    env.close()
    return model, all_rewards

Step 7: Train on CartPole

model, rewards = train_ppo("CartPole-v1", total_timesteps=200_000)
 
# Plot learning curve
window = 20
smoothed = np.convolve(rewards, np.ones(window)/window, mode="valid")
plt.figure(figsize=(10, 5))
plt.plot(smoothed)
plt.xlabel("Episode")
plt.ylabel(f"Reward ({window}-ep moving average)")
plt.title("PPO on CartPole-v1")
plt.grid(True)
plt.savefig("ppo_cartpole.png", dpi=150)
plt.show()
print(f"Final 20 episodes: {np.mean(rewards[-20:]):.1f} (max 500)")

CartPole should reach 500 (max score) within 50-100k timesteps. If it doesn’t, check:

  • Is advantage normalization enabled?
  • Are log_probs detached correctly in the buffer?
  • Is the learning rate too high (try 1e-4)?

Step 8: Test on LunarLander-v2

LunarLander is harder: 8-dimensional observation, 4 discrete actions, shaped reward. Good test of whether your implementation generalizes.

model_lunar, rewards_lunar = train_ppo(
    "LunarLander-v2",
    total_timesteps=500_000,
    n_steps=2048,
    lr=3e-4,
)
 
# LunarLander: solved at 200+ reward
smoothed = np.convolve(rewards_lunar, np.ones(20)/20, mode="valid")
plt.figure(figsize=(10, 5))
plt.plot(smoothed)
plt.axhline(y=200, color="r", linestyle="--", label="Solved threshold")
plt.xlabel("Episode")
plt.ylabel("Reward (20-ep moving average)")
plt.title("PPO on LunarLander-v2")
plt.legend()
plt.grid(True)
plt.savefig("ppo_lunarlander.png", dpi=150)
plt.show()

Should reach 200+ reward within 300-500k timesteps with these hyperparameters.

Common bugs and fixes

SymptomLikely causeFix
Reward stays flatAdvantages not normalizedAdd (adv - mean) / (std + eps)
Training explodes (NaN)Learning rate too highReduce to 1e-4, check grad clipping
Performance drops suddenlyClip epsilon too largeReduce from 0.2 to 0.1
Slow learning on LunarLanderNot enough timesteps per rolloutIncrease n_steps to 4096
High value loss, poor returnsCritic underfittingIncrease hidden size or n_epochs
Entropy drops to 0 quicklyPolicy collapses to deterministicIncrease ent_coef to 0.02

What you built

The complete PPO algorithm in ~150 lines of functional code:

  1. Actor-Critic network: shared backbone, separate heads
  2. Rollout collection: run policy, store transitions
  3. GAE computation: advantage estimation with bias-variance control
  4. PPO update: clipped surrogate + value + entropy, minibatch SGD
  5. Training loop: collect → compute advantages → update → repeat

This is the same algorithm (simplified) used in stable-baselines3, CleanRL, and production RL systems.

Compare with stable-baselines3

from stable_baselines3 import PPO as SB3_PPO
from stable_baselines3.common.env_util import make_vec_env
 
env = make_vec_env("LunarLander-v2", n_envs=4)
sb3_model = SB3_PPO("MlpPolicy", env, verbose=1)
sb3_model.learn(total_timesteps=500_000)
 
# stable-baselines3 will converge faster (vectorized envs, optimized code)
# but the algorithm is the same

Try this next

  1. Continuous actions: Modify the actor to output mean and std of a Gaussian (instead of Categorical). Test on LunarLanderContinuous-v2. Key change: replace Categorical with Normal, adjust log_prob and entropy.
  2. Vectorized environments: Use gymnasium.vector.make to run multiple environments in parallel. This collects rollouts faster and provides more diverse experience per update.
  3. Observation normalization: Add running mean/std normalization for observations. This helps a lot when observation scales differ (position in meters, angle in radians).