引言
在真实世界中,智能体很少独自行动。无论是团队协作(多个机器人共同搬运物体)、市场竞争(自动驾驶车辆在路口博弈)还是混合场景(王者荣耀 5v5 对战),多个智能体的交互构成了更复杂、更真实的决策环境。
多智能体强化学习(Multi-Agent Reinforcement Learning, MARL) 研究如何让多个智能体在共享环境中学习策略,以实现各自或共同的目标。
MARL 的核心挑战:
- 🌐 非平稳性:其他智能体的策略在变化,环境不再是静态的
- 🔄 信用分配:团队奖励如何公平地分配给每个成员?
- 🤝 协调问题:如何学会合作而非各自为政?
- ⚔️ 对抗学习:如何在竞争中找到均衡策略?
本文将系统介绍 MARL 的核心概念、经典算法(IQL、VDN、QMIX、MADDPG、MAPPO)以及实战应用。
1. MARL 基础理论
1.1 从单智能体到多智能体
单智能体 MDP:$\langle \mathcal{S}, \mathcal{A}, P, R, \gamma \rangle$
多智能体 MDP(Stochastic Game): \(\langle N, \mathcal{S}, \{\mathcal{A}^i\}_{i=1}^N, P, \{R^i\}_{i=1}^N, \gamma \rangle\)
其中:
- $N$:智能体数量
- $\mathcal{A}^i$:智能体 $i$ 的动作空间
- $R^i$:智能体 $i$ 的奖励函数
联合动作:$\mathbf{a} = (a^1, a^2, \ldots, a^N)$
| 状态转移:$P(s’ | s, \mathbf{a})$ 依赖于所有智能体的动作 |
1.2 MARL 的分类
按任务类型
| 类型 | 特征 | 示例 |
|---|---|---|
| 完全合作 | 所有智能体共享奖励 | 多机器人协作、StarCraft |
| 完全竞争 | 零和博弈,一方收益=对方损失 | 国际象棋、围棋 |
| 混合场景 | 部分合作+部分竞争 | 王者荣耀(队内合作、队间竞争) |
按信息共享
| 模式 | 训练信息 | 执行信息 | 代表算法 |
|---|---|---|---|
| 完全去中心化 | 局部 | 局部 | IQL |
| CTDE | 全局 | 局部 | VDN, QMIX, MADDPG |
| 中心化 | 全局 | 全局 | 传统规划方法 |
CTDE(Centralized Training with Decentralized Execution):
- 训练阶段:利用全局信息(如其他智能体的动作、全局状态)
- 执行阶段:每个智能体只能访问局部观测
1.3 核心挑战详解
挑战 1:非平稳性(Non-stationarity)
问题:智能体 A 的策略变化会改变智能体 B 的环境动态。
示例:
# 智能体 B 的视角
状态 s_t: [位置, 速度, ...]
动作 a_t: 向右移动
奖励 r_t: +10(成功)
# 但在 t+1000 时刻,智能体 A 改变策略
状态 s_{t+1000}: [相同位置, 相同速度, ...]
动作 a_{t+1000}: 向右移动
奖励 r_{t+1000}: -5(失败,因为 A 的策略变了)
解决方案:
- 使用对手建模(opponent modeling)
- 经验回放时采样多样化策略
- Meta-learning 快速适应
挑战 2:维度灾难
| 联合动作空间:$ | \mathcal{A}^{\text{joint}} | = \prod_{i=1}^N | \mathcal{A}^i | $ |
示例:5 个智能体,每个有 10 个动作 → $10^5 = 100,000$ 种联合动作
解决方案:
- 值函数分解(Value Decomposition)
- Mean Field 近似
- 通信机制减少协调复杂度
挑战 3:信用分配(Credit Assignment)
问题:团队获得 +100 奖励,但智能体 A 出力 80%,智能体 B 只出力 20%,如何公平分配?
解决方案:
- 差分奖励(Difference Rewards)
- Shapley 值
- 可加性值函数分解
2. 独立学习(Independent Learning)
2.1 IQL(Independent Q-Learning)
核心思想:每个智能体独立训练,将其他智能体视为环境的一部分。
算法:每个智能体 $i$ 维护自己的 Q 函数 $Q^i(s, a^i)$,使用标准 Q-learning 更新:
\[Q^i(s, a^i) \leftarrow Q^i(s, a^i) + \alpha \left[ r^i + \gamma \max_{a'^i} Q^i(s', a'^i) - Q^i(s, a^i) \right]\]优点:
- ✅ 实现简单
- ✅ 完全去中心化
- ✅ 可扩展到大量智能体
缺点:
- ❌ 忽略智能体间协调
- ❌ 收敛性无保证(环境非平稳)
- ❌ 可能陷入次优均衡
2.2 IQL 实现(简化示例)
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
class DQN(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(DQN, self).__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim)
)
def forward(self, x):
return self.net(x)
class IQLAgent:
def __init__(self, agent_id, state_dim, action_dim, lr=1e-3, gamma=0.99, epsilon=1.0):
self.agent_id = agent_id
self.gamma = gamma
self.epsilon = epsilon
self.epsilon_min = 0.01
self.epsilon_decay = 0.995
self.q_network = DQN(state_dim, action_dim)
self.target_network = DQN(state_dim, action_dim)
self.target_network.load_state_dict(self.q_network.state_dict())
self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
self.memory = deque(maxlen=10000)
def select_action(self, state):
"""ε-greedy 策略"""
if random.random() < self.epsilon:
return random.randint(0, self.q_network.net[-1].out_features - 1)
with torch.no_grad():
state = torch.FloatTensor(state).unsqueeze(0)
q_values = self.q_network(state)
return q_values.argmax().item()
def store_transition(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
def update(self, batch_size=64):
if len(self.memory) < batch_size:
return
# 采样
batch = random.sample(self.memory, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
states = torch.FloatTensor(states)
actions = torch.LongTensor(actions).unsqueeze(1)
rewards = torch.FloatTensor(rewards).unsqueeze(1)
next_states = torch.FloatTensor(next_states)
dones = torch.FloatTensor(dones).unsqueeze(1)
# 当前 Q 值
q_values = self.q_network(states).gather(1, actions)
# 目标 Q 值
with torch.no_grad():
next_q_values = self.target_network(next_states).max(1, keepdim=True)[0]
target_q_values = rewards + self.gamma * next_q_values * (1 - dones)
# 损失
loss = nn.MSELoss()(q_values, target_q_values)
# 更新
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# 衰减 epsilon
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
def update_target_network(self):
self.target_network.load_state_dict(self.q_network.state_dict())
# 多智能体环境训练(伪代码)
def train_iql(env, n_agents, n_episodes=1000):
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agents = [IQLAgent(i, state_dim, action_dim) for i in range(n_agents)]
for episode in range(n_episodes):
states = env.reset() # 返回所有智能体的状态
episode_reward = 0
for step in range(500):
# 每个智能体选择动作
actions = [agent.select_action(states[i]) for i, agent in enumerate(agents)]
# 执行联合动作
next_states, rewards, dones, _ = env.step(actions)
# 存储经验并更新
for i, agent in enumerate(agents):
agent.store_transition(states[i], actions[i], rewards[i], next_states[i], dones[i])
agent.update()
episode_reward += sum(rewards)
states = next_states
if all(dones):
break
# 定期更新目标网络
if episode % 10 == 0:
for agent in agents:
agent.update_target_network()
if episode % 100 == 0:
print(f"Episode {episode}, Total Reward: {episode_reward}")
3. 值函数分解方法
3.1 VDN(Value Decomposition Networks)
核心思想(Sunehag et al., 2018):将全局 Q 函数分解为各智能体 Q 函数的加和。
\[Q_{\text{tot}}(\mathbf{s}, \mathbf{a}) = \sum_{i=1}^N Q^i(o^i, a^i)\]其中 $o^i$ 是智能体 $i$ 的局部观测。
满足条件:
\[\arg\max_{\mathbf{a}} Q_{\text{tot}}(\mathbf{s}, \mathbf{a}) = \begin{pmatrix} \arg\max_{a^1} Q^1(o^1, a^1) \\ \vdots \\ \arg\max_{a^N} Q^N(o^N, a^N) \end{pmatrix}\]优点:
- ✅ 去中心化执行
- ✅ 保证一致性(全局最优=局部最优)
缺点:
- ❌ 表达能力受限(只能加和)
- ❌ 无法表示复杂协调(如互补动作)
3.2 QMIX
突破(Rashid et al., 2018):使用单调混合网络(Monotonic Mixing Network)放松加和限制。
分解形式:
\[Q_{\text{tot}}(\mathbf{s}, \mathbf{a}) = f_{\text{mix}}(Q^1(o^1, a^1), \ldots, Q^N(o^N, a^N); \mathbf{s})\]单调性约束:
\[\frac{\partial Q_{\text{tot}}}{\partial Q^i} \geq 0, \quad \forall i\]网络结构:
局部 Q 值: [Q^1, Q^2, ..., Q^N]
↓
Mixing Network (输入全局状态 s):
- 超网络生成权重(确保非负)
- 加权求和 + 非线性变换
↓
全局 Q 值: Q_tot
实现要点:
class QMixerNetwork(nn.Module):
def __init__(self, n_agents, state_dim, hidden_dim=32):
super(QMixerNetwork, self).__init__()
# 超网络:生成第一层权重
self.hyper_w1 = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, n_agents * hidden_dim)
)
# 超网络:生成第二层权重
self.hyper_w2 = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim)
)
# 偏置项
self.hyper_b1 = nn.Linear(state_dim, hidden_dim)
self.hyper_b2 = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
self.n_agents = n_agents
self.hidden_dim = hidden_dim
def forward(self, agent_qs, state):
"""
agent_qs: [batch, n_agents]
state: [batch, state_dim]
"""
batch_size = agent_qs.size(0)
# 生成权重(确保非负)
w1 = torch.abs(self.hyper_w1(state)) # [batch, n_agents * hidden_dim]
w1 = w1.view(batch_size, self.n_agents, self.hidden_dim)
b1 = self.hyper_b1(state) # [batch, hidden_dim]
# 第一层:加权求和
hidden = torch.bmm(agent_qs.unsqueeze(1), w1).squeeze(1) # [batch, hidden_dim]
hidden = hidden + b1
hidden = torch.relu(hidden)
# 第二层
w2 = torch.abs(self.hyper_w2(state)) # [batch, hidden_dim]
b2 = self.hyper_b2(state) # [batch, 1]
q_tot = torch.sum(hidden * w2, dim=1, keepdim=True) + b2
return q_tot
# QMIX 训练伪代码
class QMIXAgent:
def __init__(self, n_agents, state_dim, action_dim):
# 每个智能体的 Q 网络
self.agent_networks = [DQN(state_dim, action_dim) for _ in range(n_agents)]
# 混合网络
self.mixer = QMixerNetwork(n_agents, state_dim)
# 目标网络
self.target_agent_networks = [DQN(state_dim, action_dim) for _ in range(n_agents)]
self.target_mixer = QMixerNetwork(n_agents, state_dim)
self.optimizer = optim.Adam(
list(self.mixer.parameters()) +
[p for net in self.agent_networks for p in net.parameters()]
)
def compute_loss(self, batch):
states, actions, rewards, next_states, dones = batch
# 当前 Q 值(每个智能体)
agent_qs = []
for i, net in enumerate(self.agent_networks):
q = net(states[:, i]).gather(1, actions[:, i].unsqueeze(1))
agent_qs.append(q)
agent_qs = torch.cat(agent_qs, dim=1) # [batch, n_agents]
# 混合为全局 Q
q_tot = self.mixer(agent_qs, states[:, 0]) # 假设全局状态=第0个智能体状态
# 目标 Q 值
with torch.no_grad():
target_agent_qs = []
for i, net in enumerate(self.target_agent_networks):
q = net(next_states[:, i]).max(1, keepdim=True)[0]
target_agent_qs.append(q)
target_agent_qs = torch.cat(target_agent_qs, dim=1)
target_q_tot = self.target_mixer(target_agent_qs, next_states[:, 0])
target_q_tot = rewards + self.gamma * target_q_tot * (1 - dones)
# 损失
loss = nn.MSELoss()(q_tot, target_q_tot)
return loss
3.3 VDN vs QMIX 对比
| 维度 | VDN | QMIX |
|---|---|---|
| 表达能力 | 线性加和 | 非线性单调函数 |
| 协调能力 | 弱 | 强 |
| 计算复杂度 | 低 | 中 |
| 理论保证 | 简单 | 单调性保证 |
| 适用场景 | 简单合作 | 复杂协调 |
4. 多智能体 Actor-Critic
4.1 MADDPG(Multi-Agent DDPG)
核心思想(Lowe et al., 2017):CTDE 框架下的 Actor-Critic。
关键设计:
- 训练:Critic 访问所有智能体的动作和状态(中心化)
- 执行:Actor 只使用自己的观测(去中心化)
Critic 更新(智能体 $i$):
\[L(\phi^i) = E \left[ \left( Q^i_{\phi^i}(s, a^1, \ldots, a^N) - y^i \right)^2 \right]\]其中: \(y^i = r^i + \gamma Q^i_{\text{target}}(s', \mu^1_{\theta'}(o^1), \ldots, \mu^N_{\theta'}(o^N))\)
Actor 更新(智能体 $i$):
\[\nabla_{\theta^i} J(\theta^i) = E \left[ \nabla_{\theta^i} \mu^i_{\theta^i}(o^i) \nabla_{a^i} Q^i_{\phi^i}(s, a^1, \ldots, a^N) \Big|_{a^i = \mu^i_{\theta^i}(o^i)} \right]\]优点:
- ✅ 适用于连续动作空间
- ✅ 训练稳定(利用全局信息)
- ✅ 可以学习竞争策略
缺点:
- ❌ 需要全局状态(训练时)
- ❌ 智能体数量多时计算量大
4.2 MAPPO(Multi-Agent PPO)
最新趋势:将 PPO 扩展到多智能体(Yu et al., 2021)。
核心改进:
- 使用 centralized value function $V(s)$
- 每个智能体独立的 Actor
- PPO-Clip 保证训练稳定
伪代码:
class MAPPOAgent:
def __init__(self, n_agents, state_dim, action_dim):
# 每个智能体的 Actor
self.actors = [PolicyNetwork(state_dim, action_dim) for _ in range(n_agents)]
# 中心化 Critic(输入全局状态)
self.critic = ValueNetwork(state_dim * n_agents)
self.optimizers_actor = [optim.Adam(actor.parameters()) for actor in self.actors]
self.optimizer_critic = optim.Adam(self.critic.parameters())
def update(self, batch):
# 计算 GAE
advantages, returns = self.compute_gae(batch)
for epoch in range(self.n_epochs):
for i in range(self.n_agents):
# Actor 更新(PPO-Clip)
new_log_probs, entropy = self.actors[i].evaluate(batch.obs[i], batch.actions[i])
ratio = torch.exp(new_log_probs - batch.old_log_probs[i])
surr1 = ratio * advantages[i]
surr2 = torch.clamp(ratio, 1 - self.epsilon, 1 + self.epsilon) * advantages[i]
actor_loss = -torch.min(surr1, surr2).mean() - self.entropy_coef * entropy.mean()
self.optimizers_actor[i].zero_grad()
actor_loss.backward()
self.optimizers_actor[i].step()
# Critic 更新(所有智能体共享)
global_state = torch.cat([batch.obs[i] for i in range(self.n_agents)], dim=-1)
values = self.critic(global_state)
critic_loss = nn.MSELoss()(values, returns)
self.optimizer_critic.zero_grad()
critic_loss.backward()
self.optimizer_critic.step()
5. 通信机制
5.1 CommNet(Communication Neural Network)
核心思想:智能体之间通过通信通道共享信息。
架构:
时间步 t:
1. 每个智能体编码自己的观测 → h^i_t
2. 广播通信:c^i_t = Σ_{j≠i} h^j_t / (N-1)
3. 整合通信:h'^i_t = f(h^i_t, c^i_t)
4. 输出动作:a^i_t = π(h'^i_t)
5.2 QCOMM
改进:学习何时以及向谁通信。
注意力机制:
\[\alpha_{ij} = \frac{\exp(e_{ij})}{\sum_{k} \exp(e_{ik})}, \quad e_{ij} = \text{Attention}(h^i, h^j)\] \[c^i = \sum_j \alpha_{ij} h^j\]6. 实战案例
6.1 场景:多机器人协作搬运
环境:3 个机器人需要协同搬运一个大箱子到目标位置。
状态:每个机器人的位置、箱子位置、目标位置 动作:上/下/左/右/抓取/释放 奖励:
- 团队奖励:箱子接近目标 → +1
- 个体惩罚:碰撞 → -0.1
算法选择:QMIX(需要精细协调)
# 使用 PyMARL 框架(伪代码)
from pymarl import QMIX
config = {
'env': 'CooperativeNavigation-v0',
'n_agents': 3,
'algorithm': 'qmix',
'mixer': 'qmix',
'batch_size': 32,
'buffer_size': 5000,
'lr': 0.0005,
'gamma': 0.99,
'epsilon_start': 1.0,
'epsilon_finish': 0.05,
'epsilon_anneal_time': 50000
}
qmix = QMIX(config)
qmix.train(n_episodes=10000)
6.2 场景:多人博弈(德州扑克)
算法选择:MADDPG(混合合作竞争)
关键点:
- 队友之间共享信息(Critic 访问队友动作)
- 对手建模(通过观测推断对手策略)
7. 调试与优化技巧
7.1 可视化智能体策略
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
def visualize_multiagent_episode(env, agents, n_steps=100):
"""可视化多智能体轨迹"""
fig, ax = plt.subplots(figsize=(8, 8))
states = env.reset()
trajectories = [[] for _ in range(len(agents))]
for step in range(n_steps):
actions = [agent.select_action(states[i]) for i, agent in enumerate(agents)]
next_states, _, dones, _ = env.step(actions)
for i in range(len(agents)):
trajectories[i].append(states[i][:2]) # 假设前2维是位置
states = next_states
if all(dones):
break
# 绘制轨迹
for i, traj in enumerate(trajectories):
traj = np.array(traj)
ax.plot(traj[:, 0], traj[:, 1], label=f'Agent {i}', marker='o')
ax.legend()
ax.set_title('Multi-Agent Trajectories')
plt.show()
7.2 监控协调指标
def compute_coordination_metrics(agents, batch):
"""计算协调程度"""
# 动作多样性(熵)
action_distribution = batch.actions.float().mean(dim=0)
action_entropy = -(action_distribution * torch.log(action_distribution + 1e-8)).sum()
# 动作一致性(方差)
action_variance = batch.actions.float().var(dim=0).mean()
# Q 值一致性
q_values = [agent.q_network(batch.states).max(1)[0] for agent in agents]
q_variance = torch.stack(q_values).var(dim=0).mean()
return {
'action_entropy': action_entropy.item(),
'action_variance': action_variance.item(),
'q_variance': q_variance.item()
}
8. 总结与展望
8.1 核心要点
- MARL 挑战:非平稳性、维度灾难、信用分配
- 经典算法:
- IQL:独立学习,简单但次优
- VDN/QMIX:值函数分解,适合合作
- MADDPG:CTDE 框架,连续动作
- MAPPO:当前 SOTA,稳定性强
- CTDE 范式:训练中心化、执行去中心化
8.2 前沿方向
- 大规模 MARL:处理数百甚至数千智能体
- 迁移学习:跨任务、跨智能体数量迁移
- 可解释性:理解协调机制
- 鲁棒性:应对通信延迟、智能体失效
8.3 实践建议
- 🎯 从简单环境开始(如 Particle Envs)
- 🎯 使用成熟框架(PyMARL、EPyMARL)
- 🎯 优先尝试 MAPPO(稳定且性能好)
- 🎯 可视化!可视化!可视化!
参考资源
- 框架:
- 论文:
- Sunehag et al. (2018): Value-Decomposition Networks For Cooperative MARL (VDN)
- Rashid et al. (2018): QMIX: Monotonic Value Function Factorisation (QMIX)
- Lowe et al. (2017): Multi-Agent Actor-Critic (MADDPG)
- Yu et al. (2021): The Surprising Effectiveness of PPO in Cooperative MARL (MAPPO)
- 课程:
- UCL Multi-Agent Systems (Hado van Hasselt)
- Stanford CS 330: Deep Multi-Task and Meta Learning
多智能体强化学习是 RL 领域最具挑战也最具应用潜力的方向。从自动驾驶车队到星际争霸 AI,MARL 正在重塑我们对智能协作的理解。掌握本文的核心算法,你将能够设计和训练复杂的多智能体系统!
💬 交流与讨论
⚠️ 尚未完成 Giscus 配置。请在
_config.yml中设置repo_id与category_id后重新部署,即可启用升级后的评论系统。配置完成后,评论区将自动支持 Markdown 代码高亮与 LaTeX 数学公式渲染,访客回复会同步到 GitHub Discussions,并具备通知功能。