Tutorial - Multi-Agent Training
Goal
After this tutorial, you can train multiple agents in cooperative and competitive settings. You will train agents in PettingZoo environments using independent PPO and parameter sharing, and observe emergent coordination.
Prerequisites: Tutorial - PPO from Scratch, Multi-Agent RL, Actor-Critic and PPO.
Time: 90-120 minutes.
Step 1: Install and explore PettingZoo
# pip install pettingzoo[mpe]
from pettingzoo.mpe import simple_spread_v3, simple_tag_v3
import numpy as np
# Cooperative: 3 agents must cover 3 landmarks
env = simple_spread_v3.parallel_env(N=3, max_cycles=100, continuous_actions=False)
observations, infos = env.reset(seed=42)
print(f"Agents: {env.agents}")
print(f"Number of agents: {len(env.agents)}")
for agent in env.agents:
obs_space = env.observation_space(agent)
act_space = env.action_space(agent)
print(f" {agent}: obs={obs_space.shape}, actions={act_space.n}")
# Run one episode with random actions
total_rewards = {a: 0.0 for a in env.agents}
obs, _ = env.reset()
for step in range(100):
actions = {agent: env.action_space(agent).sample() for agent in env.agents}
obs, rewards, terms, truncs, infos = env.step(actions)
for a in env.agents:
total_rewards[a] += rewards[a]
if all(terms.values()) or all(truncs.values()):
break
print(f"\nRandom policy rewards: {total_rewards}")
# Random agents perform poorly -- they need to learn coordinationWhat just happened: PettingZoo’s parallel API lets all agents act simultaneously. simple_spread is cooperative: all agents share the same reward based on how well the landmarks are covered. Random agents wander aimlessly.
Step 2: Understand the environment
# simple_spread observation structure:
# Each agent sees:
# - its own velocity (2d)
# - its own position (2d)
# - relative positions of all landmarks (N*2d)
# - relative positions of other agents ((N-1)*2d)
# Reward: negative sum of distances from each landmark to its nearest agent
# All agents get the same reward (fully cooperative)
# Actions (discrete): [no_action, move_left, move_right, move_down, move_up]
# The optimal strategy: each agent goes to a different landmark
# This requires implicit coordination -- no agent "owns" a landmarkStep 3: Independent PPO — each agent has its own policy
The simplest multi-agent approach: each agent runs PPO independently.
import torch
import torch.nn as nn
from torch.distributions import Categorical
class PPOAgent:
"""Single-agent PPO, used independently per agent in MARL."""
def __init__(self, obs_dim, n_actions, lr=3e-4, hidden=64):
self.model = nn.Sequential(
nn.Linear(obs_dim, hidden), nn.Tanh(),
nn.Linear(hidden, hidden), nn.Tanh(),
)
self.actor = nn.Linear(hidden, n_actions)
self.critic = nn.Linear(hidden, 1)
self.params = list(self.model.parameters()) + \
list(self.actor.parameters()) + \
list(self.critic.parameters())
self.optimizer = torch.optim.Adam(self.params, lr=lr)
self.buffer_obs = []
self.buffer_actions = []
self.buffer_log_probs = []
self.buffer_rewards = []
self.buffer_values = []
self.buffer_dones = []
def get_action(self, obs):
obs_t = torch.FloatTensor(obs)
with torch.no_grad():
features = self.model(obs_t)
logits = self.actor(features)
value = self.critic(features).item()
dist = Categorical(logits=logits)
action = dist.sample()
return action.item(), dist.log_prob(action).item(), value
def store(self, obs, action, log_prob, reward, value, done):
self.buffer_obs.append(obs)
self.buffer_actions.append(action)
self.buffer_log_probs.append(log_prob)
self.buffer_rewards.append(reward)
self.buffer_values.append(value)
self.buffer_dones.append(done)
def compute_gae(self, last_value, gamma=0.99, lam=0.95):
rewards = np.array(self.buffer_rewards)
values = np.array(self.buffer_values)
dones = np.array(self.buffer_dones)
T = len(rewards)
advantages = np.zeros(T)
last_gae = 0.0
values_ext = np.append(values, last_value)
for t in reversed(range(T)):
non_terminal = 1.0 - dones[t]
delta = rewards[t] + gamma * values_ext[t+1] * non_terminal - values_ext[t]
advantages[t] = delta + gamma * lam * non_terminal * last_gae
last_gae = advantages[t]
returns = advantages + values
return advantages, returns
def update(self, last_value, clip_eps=0.2, n_epochs=4, batch_size=64):
advantages, returns = self.compute_gae(last_value)
obs_t = torch.FloatTensor(np.array(self.buffer_obs))
actions_t = torch.LongTensor(self.buffer_actions)
old_log_probs_t = torch.FloatTensor(self.buffer_log_probs)
advantages_t = torch.FloatTensor(advantages)
returns_t = torch.FloatTensor(returns)
# Normalize advantages
advantages_t = (advantages_t - advantages_t.mean()) / (advantages_t.std() + 1e-8)
n = len(obs_t)
for _ in range(n_epochs):
idx = np.random.permutation(n)
for start in range(0, n, batch_size):
end = min(start + batch_size, n)
b = idx[start:end]
features = self.model(obs_t[b])
logits = self.actor(features)
values = self.critic(features).squeeze(-1)
dist = Categorical(logits=logits)
new_log_probs = dist.log_prob(actions_t[b])
entropy = dist.entropy()
ratio = torch.exp(new_log_probs - old_log_probs_t[b])
surr1 = ratio * advantages_t[b]
surr2 = torch.clamp(ratio, 1-clip_eps, 1+clip_eps) * advantages_t[b]
policy_loss = -torch.min(surr1, surr2).mean()
value_loss = nn.functional.mse_loss(values, returns_t[b])
loss = policy_loss + 0.5 * value_loss - 0.01 * entropy.mean()
self.optimizer.zero_grad()
loss.backward()
nn.utils.clip_grad_norm_(self.params, 0.5)
self.optimizer.step()
# Clear buffer
self.buffer_obs.clear()
self.buffer_actions.clear()
self.buffer_log_probs.clear()
self.buffer_rewards.clear()
self.buffer_values.clear()
self.buffer_dones.clear()Training loop
def train_independent_ppo(env_fn, n_episodes=2000, n_steps_per_update=256):
"""Train independent PPO agents."""
env = env_fn()
obs, _ = env.reset()
# Create one PPO agent per environment agent
agents = {}
for agent_name in env.agents:
obs_dim = env.observation_space(agent_name).shape[0]
n_actions = env.action_space(agent_name).n
agents[agent_name] = PPOAgent(obs_dim, n_actions)
episode_rewards = []
steps_collected = 0
for ep in range(n_episodes):
obs, _ = env.reset()
ep_reward = {a: 0.0 for a in env.agents}
done = False
while not done:
actions = {}
for agent_name in env.agents:
action, log_prob, value = agents[agent_name].get_action(obs[agent_name])
actions[agent_name] = action
agents[agent_name].store(
obs[agent_name], action, log_prob, 0.0, value, 0.0
)
next_obs, rewards, terms, truncs, infos = env.step(actions)
done = all(terms.values()) or all(truncs.values())
# Store rewards and dones
for agent_name in env.agents:
agents[agent_name].buffer_rewards[-1] = rewards[agent_name]
agents[agent_name].buffer_dones[-1] = float(
terms[agent_name] or truncs[agent_name]
)
ep_reward[agent_name] += rewards[agent_name]
obs = next_obs
steps_collected += 1
# Update when enough steps collected
if steps_collected >= n_steps_per_update:
for agent_name in env.agents:
last_obs = torch.FloatTensor(obs[agent_name])
with torch.no_grad():
feat = agents[agent_name].model(last_obs)
last_val = agents[agent_name].critic(feat).item()
agents[agent_name].update(last_val)
steps_collected = 0
mean_reward = np.mean(list(ep_reward.values()))
episode_rewards.append(mean_reward)
if ep % 100 == 0:
recent = episode_rewards[-50:] if len(episode_rewards) >= 50 else episode_rewards
print(f"Episode {ep}: mean reward = {np.mean(recent):.2f}")
env.close()
return agents, episode_rewards
# Train
env_fn = lambda: simple_spread_v3.parallel_env(N=3, max_cycles=100, continuous_actions=False)
agents, rewards = train_independent_ppo(env_fn, n_episodes=2000)Step 4: Parameter sharing — one policy for all agents
When agents are homogeneous (same type, same observation/action space), share one policy.
def train_shared_ppo(env_fn, n_episodes=2000, n_steps_per_update=256):
"""Train with shared parameters -- one policy for all agents."""
env = env_fn()
obs, _ = env.reset()
# Single shared agent (policy shared across all)
agent_name = env.agents[0]
obs_dim = env.observation_space(agent_name).shape[0]
n_actions = env.action_space(agent_name).n
shared_agent = PPOAgent(obs_dim, n_actions)
episode_rewards = []
steps_collected = 0
# Separate buffers per agent but shared model
agent_buffers = {a: {"obs": [], "actions": [], "log_probs": [],
"rewards": [], "values": [], "dones": []}
for a in env.agents}
for ep in range(n_episodes):
obs, _ = env.reset()
ep_reward = {a: 0.0 for a in env.agents}
done = False
while not done:
actions = {}
for a in env.agents:
action, log_prob, value = shared_agent.get_action(obs[a])
actions[a] = action
agent_buffers[a]["obs"].append(obs[a])
agent_buffers[a]["actions"].append(action)
agent_buffers[a]["log_probs"].append(log_prob)
agent_buffers[a]["values"].append(value)
next_obs, rewards, terms, truncs, infos = env.step(actions)
done = all(terms.values()) or all(truncs.values())
for a in env.agents:
agent_buffers[a]["rewards"].append(rewards[a])
agent_buffers[a]["dones"].append(float(terms[a] or truncs[a]))
ep_reward[a] += rewards[a]
obs = next_obs
steps_collected += 1
if steps_collected >= n_steps_per_update:
# Aggregate all agents' data into shared agent's buffer
for a in env.agents:
for i in range(len(agent_buffers[a]["obs"])):
shared_agent.store(
agent_buffers[a]["obs"][i],
agent_buffers[a]["actions"][i],
agent_buffers[a]["log_probs"][i],
agent_buffers[a]["rewards"][i],
agent_buffers[a]["values"][i],
agent_buffers[a]["dones"][i],
)
agent_buffers[a] = {"obs": [], "actions": [], "log_probs": [],
"rewards": [], "values": [], "dones": []}
last_obs = torch.FloatTensor(obs[env.agents[0]])
with torch.no_grad():
feat = shared_agent.model(last_obs)
last_val = shared_agent.critic(feat).item()
shared_agent.update(last_val)
steps_collected = 0
mean_reward = np.mean(list(ep_reward.values()))
episode_rewards.append(mean_reward)
if ep % 100 == 0:
recent = episode_rewards[-50:] if len(episode_rewards) >= 50 else episode_rewards
print(f"Episode {ep} (shared): mean reward = {np.mean(recent):.2f}")
env.close()
return shared_agent, episode_rewardsWhat just happened: With parameter sharing, all agents use the same neural network. Each agent still gets its own observation, but the policy is shared. This means 3x more training data for the same number of environment steps. Convergence should be faster.
Step 5: Add communication
Let agents share a simple observation summary with each other.
class CommunicatingPPOAgent:
"""PPO agent that broadcasts a message and receives others' messages."""
def __init__(self, obs_dim, n_actions, msg_dim=8, n_other_agents=2, hidden=64, lr=3e-4):
self.msg_dim = msg_dim
input_dim = obs_dim + msg_dim * n_other_agents # obs + received messages
self.model = nn.Sequential(
nn.Linear(input_dim, hidden), nn.Tanh(),
nn.Linear(hidden, hidden), nn.Tanh(),
)
self.actor = nn.Linear(hidden, n_actions)
self.critic = nn.Linear(hidden, 1)
self.msg_encoder = nn.Linear(obs_dim, msg_dim) # generate message from obs
self.params = list(self.model.parameters()) + \
list(self.actor.parameters()) + \
list(self.critic.parameters()) + \
list(self.msg_encoder.parameters())
self.optimizer = torch.optim.Adam(self.params, lr=lr)
def get_message(self, obs):
"""Generate message to broadcast to other agents."""
obs_t = torch.FloatTensor(obs)
with torch.no_grad():
msg = torch.tanh(self.msg_encoder(obs_t))
return msg.numpy()
def get_action(self, obs, received_messages):
"""Select action given own observation + received messages."""
# Concatenate obs with all received messages
combined = np.concatenate([obs] + received_messages)
combined_t = torch.FloatTensor(combined)
with torch.no_grad():
features = self.model(combined_t)
logits = self.actor(features)
value = self.critic(features).item()
dist = Categorical(logits=logits)
action = dist.sample()
return action.item(), dist.log_prob(action).item(), valueStep 6: Competitive environment
from pettingzoo.mpe import simple_tag_v3
# Competitive: predators chase prey
# Predators (3) get reward for catching prey
# Prey (1) gets negative reward when caught, positive for evading
env = simple_tag_v3.parallel_env(
num_good=1, # prey
num_adversaries=3, # predators
num_obstacles=2,
max_cycles=100,
continuous_actions=False,
)
obs, _ = env.reset()
print(f"Agents: {env.agents}")
# adversary_0, adversary_1, adversary_2 (predators), agent_0 (prey)
# Train predators cooperatively, prey independently
# This naturally creates an arms race: better predators → better prey → ...Step 7: Measure emergent behavior
def analyze_coordination(env_fn, agent, n_eval_episodes=100):
"""Measure whether agents learned to coordinate.
For simple_spread: check if agents go to different landmarks.
"""
env = env_fn()
collision_count = 0
coverage_scores = []
for _ in range(n_eval_episodes):
obs, _ = env.reset()
done = False
while not done:
actions = {}
for a in env.agents:
action, _, _ = agent.get_action(obs[a])
actions[a] = action
obs, _, terms, truncs, _ = env.step(actions)
done = all(terms.values()) or all(truncs.values())
# Final positions: check if agents are near different landmarks
# (environment-specific analysis)
# In simple_spread, the reward already captures this
env.close()
print(f"Mean coverage score: {np.mean(coverage_scores):.3f}")
def plot_comparison(rewards_independent, rewards_shared, window=50):
"""Compare independent vs shared parameter training."""
smooth_ind = np.convolve(rewards_independent, np.ones(window)/window, mode="valid")
smooth_sh = np.convolve(rewards_shared, np.ones(window)/window, mode="valid")
plt.figure(figsize=(10, 5))
plt.plot(smooth_ind, label="Independent PPO", alpha=0.8)
plt.plot(smooth_sh, label="Shared PPO", alpha=0.8)
plt.xlabel("Episode")
plt.ylabel(f"Mean reward ({window}-ep avg)")
plt.title("Independent vs Shared Parameter Training")
plt.legend()
plt.grid(True)
plt.savefig("marl_comparison.png", dpi=150)
plt.show()What you built
- Independent PPO: each agent learns its own policy. Simple but ignores coordination opportunities.
- Shared PPO: all agents share one policy. More data-efficient, natural coordination for homogeneous agents.
- Communication: agents learn to broadcast messages. The content of messages emerges from training.
- Competitive training: adversarial dynamics create an arms race.
Try this next
- CTDE with shared critic: Modify the shared PPO to use a centralized critic (takes all agents’ observations concatenated) while keeping decentralized actors. See Multi-Agent RL for the MAPPO architecture.
- Scale to more agents: Run simple_spread with N=5 and N=10 agents. How does performance degrade? Does parameter sharing help more with more agents?
- Role specialization: In simple_tag, do predators specialize (some chase directly, others cut off escape routes)? Visualize individual agent trajectories over an episode to see if roles emerge.
Links
- Multi-Agent RL — theory and frameworks
- Tutorial - PPO from Scratch — single-agent PPO implementation
- Actor-Critic and PPO — PPO algorithm details
- Reward Design and Curriculum — reward design in multi-agent settings
- Case Study - RL System Design — multi-agent pursuit design