蒙特卡洛树搜索(Monte Carlo Tree Search, MCTS)是近二十年来人工智能领域最重要的算法突破之一。它让计算机在围棋、象棋等复杂博弈游戏中达到甚至超越人类顶尖水平,是 AlphaGo、AlphaZero 等里程碑系统的核心组件。
MCTS 优雅地结合了树搜索的系统性与蒙特卡洛模拟的随机性,在探索(Exploration)与利用(Exploitation)之间取得了完美平衡。本文将深入剖析 MCTS 的核心原理、关键变种和工程实现。
一、MCTS 核心原理
1.1 为什么需要 MCTS?
传统博弈树搜索(如 Minimax + Alpha-Beta剪枝)在围棋等游戏中失效:
| 特性 | 象棋 | 围棋 |
|---|---|---|
| 分支因子 | ~35 | ~250 |
| 平均步数 | ~80 | ~150 |
| 状态空间 | ~10^47 | ~10^170 |
| 评估函数 | 相对容易 | 极其困难 |
问题:
- 围棋的搜索树过于庞大,无法完全遍历
- 中盘局面难以评估(不像象棋有明确的子力价值)
MCTS 的解决方案:
- 选择性扩展:只探索有希望的分支
- 蒙特卡洛评估:用随机对局代替静态评估函数
- 渐进优化:随着模拟次数增加,决策质量提升
1.2 MCTS 四步骤
根节点
│
┌────┼────┐
│ │ │
节点A 节点B 节点C
│
┌──┼──┐
节点D E F
流程:
1. 选择(Selection) - 从根节点向下选择,直到叶子节点
2. 扩展(Expansion) - 添加一个或多个子节点
3. 模拟(Simulation) - 从新节点随机对局到终局
4. 回传(Backpropagation)- 将结果回传到路径上的所有节点
伪代码:
class MCTSNode:
def __init__(self, state, parent=None):
self.state = state # 游戏状态
self.parent = parent # 父节点
self.children = [] # 子节点列表
self.visits = 0 # 访问次数
self.wins = 0 # 胜利次数
self.untried_actions = state.get_legal_actions()
def uct_select_child(self):
"""使用 UCT 公式选择子节点"""
return max(self.children, key=lambda c: c.uct_value())
def uct_value(self, c=1.41):
"""UCT = 平均胜率 + 探索项"""
if self.visits == 0:
return float('inf')
exploitation = self.wins / self.visits
exploration = c * np.sqrt(np.log(self.parent.visits) / self.visits)
return exploitation + exploration
def expand(self):
"""扩展一个未尝试的动作"""
action = self.untried_actions.pop()
next_state = self.state.apply_action(action)
child = MCTSNode(next_state, parent=self)
self.children.append(child)
return child
def update(self, result):
"""回传结果"""
self.visits += 1
self.wins += result
def mcts_search(root_state, num_simulations):
"""MCTS 主循环"""
root = MCTSNode(root_state)
for _ in range(num_simulations):
node = root
# 1. 选择:向下选择直到叶子节点或有未扩展动作的节点
while node.untried_actions == [] and node.children != []:
node = node.uct_select_child()
# 2. 扩展:如果有未尝试的动作,扩展一个子节点
if node.untried_actions != []:
node = node.expand()
# 3. 模拟:从当前节点随机对局到终局
state = node.state
while not state.is_terminal():
action = random.choice(state.get_legal_actions())
state = state.apply_action(action)
result = state.get_result() # 1 for win, 0 for loss, 0.5 for draw
# 4. 回传:更新路径上所有节点的统计信息
while node is not None:
node.update(result)
result = 1 - result # 对手的结果相反
node = node.parent
# 返回访问次数最多的动作
return max(root.children, key=lambda c: c.visits).state.last_action
二、UCT:Upper Confidence Bound for Trees
2.1 探索-利用困境
问题:如何在两个动作之间选择?
- 动作A:模拟10次,赢6次 (60% 胜率)
- 动作B:模拟2次,赢2次 (100% 胜率)
纯利用(Exploitation):选择 B(当前最优)
- 风险:B 的样本少,可能运气好,需要更多探索
纯探索(Exploration):平均分配资源给 A 和 B
- 风险:浪费资源在明显较差的动作上
2.2 UCT 公式
\[\text{UCT}(s_i) = \frac{w_i}{n_i} + c \sqrt{\frac{\ln N}{n_i}}\]- $w_i$ : 节点 $i$ 的胜利次数
- $n_i$ : 节点 $i$ 的访问次数
- $N$ : 父节点的访问次数
- $c$ : 探索常数(通常取 $\sqrt{2}$ 或通过调优确定)
两项的含义:
- 利用项 $\frac{w_i}{n_i}$ : 平均胜率,倾向选择表现好的节点
- 探索项 $c \sqrt{\frac{\ln N}{n_i}}$ : 访问少的节点得到更高奖励
性质:
- 访问次数少的节点,探索项大 → 鼓励探索
- 随着访问增加,探索项减小 → 逐渐收敛到利用
2.3 代码实现
import numpy as np
class UCTNode:
def __init__(self, state, parent=None, prior_prob=1.0):
self.state = state
self.parent = parent
self.children = {}
self.visits = 0
self.value_sum = 0 # 累计价值
self.prior = prior_prob # 先验概率(AlphaZero 中使用)
def is_fully_expanded(self):
return len(self.children) == len(self.state.get_legal_actions())
def select_child(self, c_puct=1.0):
"""使用 UCT 或 PUCT 选择最佳子节点"""
def uct_score(child):
# Q值(平均奖励)
Q = child.value_sum / child.visits if child.visits > 0 else 0
# U值(探索奖励)
U = c_puct * child.prior * np.sqrt(self.visits) / (1 + child.visits)
return Q + U
return max(self.children.values(), key=uct_score)
def expand(self, action, next_state, prior_prob):
"""扩展新节点"""
if action not in self.children:
self.children[action] = UCTNode(
state=next_state,
parent=self,
prior_prob=prior_prob
)
return self.children[action]
def update(self, value):
"""更新节点统计"""
self.visits += 1
self.value_sum += value
def get_value(self):
"""获取平均价值"""
return self.value_sum / self.visits if self.visits > 0 else 0
# UCT 参数调优示例
def tune_uct_parameter(game, c_values):
"""测试不同 c 值的性能"""
results = {}
for c in c_values:
wins = 0
num_games = 100
for _ in range(num_games):
state = game.initial_state()
while not state.is_terminal():
action = mcts_search(state, num_simulations=1000, c_puct=c)
state = state.apply_action(action)
if state.winner() == 1:
wins += 1
results[c] = wins / num_games
print(f"c = {c:.2f}: 胜率 = {results[c]:.2%}")
best_c = max(results, key=results.get)
return best_c, results
# 示例:测试不同的 c 值
c_values = [0.5, 1.0, 1.41, 2.0, 3.0]
# best_c, results = tune_uct_parameter(my_game, c_values)
三、PUCT:Polynomial UCT(AlphaGo/AlphaZero)
3.1 与经典 UCT 的区别
经典 UCT:
\[\text{UCT}(s) = \frac{w}{n} + c \sqrt{\frac{\ln N}{n}}\]PUCT(加入先验知识):
\[\text{PUCT}(s, a) = Q(s, a) + c_{\text{puct}} \cdot P(s, a) \cdot \frac{\sqrt{N(s)}}{1 + N(s, a)}\]- $Q(s, a)$ : 动作 $a$ 的平均价值(神经网络预测 + MCTS统计)
- $P(s, a)$ : 策略网络预测的先验概率
- $N(s)$ : 状态 $s$ 的访问次数
- $N(s, a)$ : 动作 $a$ 的访问次数
关键改进:
- 先验概率引导:神经网络提供的 $P(s, a)$ 引导搜索方向
- 多项式探索项:$\frac{\sqrt{N(s)}}{1 + N(s, a)}$ 比对数项更激进
3.2 AlphaZero 风格的 MCTS
class AlphaZeroMCTS:
def __init__(self, game, neural_network, num_simulations=800, c_puct=1.0):
self.game = game
self.network = neural_network
self.num_simulations = num_simulations
self.c_puct = c_puct
self.root = None
def search(self, state):
"""执行 MCTS 搜索"""
if self.root is None or self.root.state != state:
self.root = UCTNode(state)
for _ in range(self.num_simulations):
node = self.root
search_path = [node]
# 1. 选择:沿着 PUCT 最大的路径向下
while node.is_fully_expanded() and node.children:
node = node.select_child(self.c_puct)
search_path.append(node)
# 2. 扩展 + 评估(使用神经网络)
if not node.state.is_terminal():
# 神经网络预测:策略 P(s, a) 和价值 V(s)
policy_probs, value = self.network.predict(node.state)
# 扩展所有合法动作
for action in node.state.get_legal_actions():
if action not in node.children:
next_state = node.state.apply_action(action)
prior_prob = policy_probs[action]
node.expand(action, next_state, prior_prob)
else:
# 终局状态
value = node.state.get_result()
# 3 & 4. 回传(隐式包含在 update 中)
for n in reversed(search_path):
n.update(value)
value = -value # 对手的视角
return self.root
def get_action_probs(self, state, temperature=1.0):
"""获取动作概率分布"""
root = self.search(state)
action_visits = [
(action, child.visits)
for action, child in root.children.items()
]
actions, visits = zip(*action_visits)
visits = np.array(visits)
if temperature == 0:
# 确定性:选择访问最多的
probs = np.zeros(len(visits))
probs[np.argmax(visits)] = 1.0
else:
# 按温度缩放后归一化
visits = visits ** (1.0 / temperature)
probs = visits / visits.sum()
return actions, probs
def get_best_action(self, state):
"""获取最佳动作"""
actions, probs = self.get_action_probs(state, temperature=0)
return actions[np.argmax(probs)]
# 使用示例
class SimpleNeuralNetwork:
"""简化的神经网络接口"""
def predict(self, state):
"""
返回:
- policy: dict {action: probability}
- value: float (当前玩家的胜率预测)
"""
# 实际实现中,这里调用深度神经网络
legal_actions = state.get_legal_actions()
policy = {a: 1.0/len(legal_actions) for a in legal_actions}
value = 0.0 # 简化:假设平局
return policy, value
# 创建 MCTS 实例
network = SimpleNeuralNetwork()
mcts = AlphaZeroMCTS(
game=my_game,
neural_network=network,
num_simulations=800,
c_puct=1.0
)
# 搜索并选择动作
state = my_game.initial_state()
best_action = mcts.get_best_action(state)
四、MCTS 优化技术
4.1 RAVE (Rapid Action Value Estimation)
核心思想:利用”动作的全局价值”加速初期探索。
假设:同一个动作在不同位置有相似的价值(适用于某些游戏,如围棋)
RAVE 公式:
\[\text{RAVE-UCT}(s, a) = (1 - \beta) \cdot Q_{\text{tree}}(s, a) + \beta \cdot Q_{\text{AMAF}}(a) + c \sqrt{\frac{\ln N}{n}}\]- $Q_{\text{tree}}(s, a)$ : 标准 MCTS 的 Q 值
- $Q_{\text{AMAF}}(a)$ : “All Moves As First”,动作 $a$ 在所有模拟中的平均表现
- $\beta$ : 权重因子,随访问次数减小
代码实现:
class RAVENode(UCTNode):
def __init__(self, state, parent=None):
super().__init__(state, parent)
self.amaf_visits = {} # {action: visit_count}
self.amaf_value = {} # {action: total_value}
def update_amaf(self, action, value):
"""更新 AMAF 统计"""
if action not in self.amaf_visits:
self.amaf_visits[action] = 0
self.amaf_value[action] = 0.0
self.amaf_visits[action] += 1
self.amaf_value[action] += value
def rave_value(self, action, c=1.0, k=100):
"""计算 RAVE-UCT 值"""
# 树统计
child = self.children.get(action)
if child is None:
return float('inf')
n_tree = child.visits
if n_tree == 0:
return float('inf')
q_tree = child.value_sum / n_tree
# AMAF 统计
n_amaf = self.amaf_visits.get(action, 0)
q_amaf = (self.amaf_value.get(action, 0) / n_amaf) if n_amaf > 0 else 0
# 计算权重 β
beta = n_amaf / (n_tree + n_amaf + 4 * k * k * n_tree * n_amaf)
# 混合值
q_mix = (1 - beta) * q_tree + beta * q_amaf
# 加上探索项
exploration = c * np.sqrt(np.log(self.visits) / n_tree)
return q_mix + exploration
def select_child_rave(self, c=1.0):
"""使用 RAVE 选择子节点"""
return max(
self.children.keys(),
key=lambda a: self.rave_value(a, c)
)
def mcts_with_rave(root_state, num_simulations):
"""带 RAVE 的 MCTS"""
root = RAVENode(root_state)
for _ in range(num_simulations):
node = root
path = [node]
actions_in_simulation = []
# 选择
while node.is_fully_expanded() and node.children:
action = node.select_child_rave()
actions_in_simulation.append(action)
node = node.children[action]
path.append(node)
# 扩展
if not node.is_fully_expanded():
action = random.choice(node.state.get_legal_actions())
actions_in_simulation.append(action)
next_state = node.state.apply_action(action)
node = node.expand(action, next_state, 1.0)
path.append(node)
# 模拟
state = node.state
while not state.is_terminal():
action = random.choice(state.get_legal_actions())
actions_in_simulation.append(action)
state = state.apply_action(action)
result = state.get_result()
# 回传(包括 AMAF 更新)
for i, n in enumerate(reversed(path)):
n.update(result)
# 更新 AMAF:路径后续所有动作
for action in actions_in_simulation[i:]:
n.update_amaf(action, result)
result = 1 - result
return max(root.children.keys(), key=lambda a: root.children[a].visits)
4.2 Virtual Loss(虚拟损失)
问题:并行 MCTS 中,多个线程可能同时选择同一节点。
解决方案:为正在评估的节点添加”虚拟损失”,降低其吸引力。
class VirtualLossNode(UCTNode):
def __init__(self, state, parent=None):
super().__init__(state, parent)
self.virtual_loss = 0
def get_value_with_virtual_loss(self):
"""考虑虚拟损失的价值"""
if self.visits == 0:
return 0
return (self.value_sum - self.virtual_loss) / (self.visits + self.virtual_loss)
def add_virtual_loss(self, loss=1):
"""添加虚拟损失"""
self.virtual_loss += loss
def remove_virtual_loss(self, loss=1):
"""移除虚拟损失"""
self.virtual_loss -= loss
import threading
class ParallelMCTS:
def __init__(self, game, num_threads=4):
self.game = game
self.num_threads = num_threads
self.lock = threading.Lock()
def search_thread(self, root, num_simulations_per_thread):
"""单线程搜索"""
for _ in range(num_simulations_per_thread):
node = root
path = []
# 选择(加锁)
with self.lock:
while node.is_fully_expanded() and node.children:
node = node.select_child()
node.add_virtual_loss() # 添加虚拟损失
path.append(node)
# 扩展、模拟(不需要锁)
if not node.state.is_terminal():
# ... 扩展和模拟逻辑
value = simulate(node.state)
else:
value = node.state.get_result()
# 回传(加锁)
with self.lock:
for n in reversed(path):
n.update(value)
n.remove_virtual_loss() # 移除虚拟损失
value = 1 - value
def search(self, state, total_simulations):
"""并行搜索"""
root = VirtualLossNode(state)
simulations_per_thread = total_simulations // self.num_threads
threads = []
for _ in range(self.num_threads):
t = threading.Thread(
target=self.search_thread,
args=(root, simulations_per_thread)
)
threads.append(t)
t.start()
for t in threads:
t.join()
return max(root.children.keys(), key=lambda a: root.children[a].visits)
4.3 Progressive Widening
问题:在分支因子巨大的游戏中(如 Go),无法一次性扩展所有子节点。
解决方案:逐步扩展子节点,优先扩展有希望的动作。
class ProgressiveWideningNode(UCTNode):
def __init__(self, state, parent=None, k=10, alpha=0.5):
super().__init__(state, parent)
self.k = k # 扩展参数
self.alpha = alpha # 扩展指数
def should_expand(self):
"""判断是否应该扩展新节点"""
allowed_children = self.k * (self.visits ** self.alpha)
return len(self.children) < allowed_children
def expand_progressive(self):
"""渐进式扩展"""
if self.should_expand() and self.untried_actions:
# 选择最有希望的动作(可以用启发式或神经网络)
action = self.select_best_untried_action()
self.untried_actions.remove(action)
next_state = self.state.apply_action(action)
child = ProgressiveWideningNode(next_state, parent=self)
self.children[action] = child
return child
return None
def select_best_untried_action(self):
"""选择最有希望的未尝试动作"""
# 简化版本:随机选择
# 实际应用中可以用启发式或神经网络评分
return random.choice(self.untried_actions)
五、实战案例:完整的井字棋 MCTS
import numpy as np
import random
from copy import deepcopy
class TicTacToe:
"""井字棋游戏"""
def __init__(self):
self.board = np.zeros((3, 3), dtype=int)
self.current_player = 1
def get_legal_actions(self):
"""获取合法动作"""
return [(i, j) for i in range(3) for j in range(3) if self.board[i, j] == 0]
def apply_action(self, action):
"""应用动作,返回新状态"""
new_state = deepcopy(self)
i, j = action
new_state.board[i, j] = new_state.current_player
new_state.current_player = -new_state.current_player
return new_state
def is_terminal(self):
"""判断是否终局"""
return self.get_winner() is not None or len(self.get_legal_actions()) == 0
def get_winner(self):
"""获取胜者:1, -1, 或 None"""
# 检查行
for i in range(3):
if abs(self.board[i, :].sum()) == 3:
return self.board[i, 0]
# 检查列
for j in range(3):
if abs(self.board[:, j].sum()) == 3:
return self.board[0, j]
# 检查对角线
if abs(np.trace(self.board)) == 3:
return self.board[0, 0]
if abs(np.trace(np.fliplr(self.board))) == 3:
return self.board[0, 2]
return None
def get_result(self):
"""获取当前玩家的结果(1=胜,0=负,0.5=平)"""
winner = self.get_winner()
if winner == self.current_player:
return 1.0
elif winner == -self.current_player:
return 0.0
else:
return 0.5
def display(self):
"""显示棋盘"""
symbols = {0: '.', 1: 'X', -1: 'O'}
for i in range(3):
print(' '.join(symbols[self.board[i, j]] for j in range(3)))
print()
class TicTacToeMCTSNode:
"""井字棋 MCTS 节点"""
def __init__(self, state, parent=None):
self.state = state
self.parent = parent
self.children = {}
self.visits = 0
self.value_sum = 0
self.untried_actions = state.get_legal_actions()
def is_fully_expanded(self):
return len(self.untried_actions) == 0
def select_child(self, c=1.41):
"""UCT 选择"""
def uct_value(child):
if child.visits == 0:
return float('inf')
exploitation = child.value_sum / child.visits
exploration = c * np.sqrt(np.log(self.visits) / child.visits)
return exploitation + exploration
return max(self.children.values(), key=uct_value)
def expand(self):
"""扩展一个子节点"""
action = self.untried_actions.pop()
next_state = self.state.apply_action(action)
child = TicTacToeMCTSNode(next_state, parent=self)
self.children[action] = child
return child
def simulate(self):
"""随机模拟到终局"""
state = deepcopy(self.state)
while not state.is_terminal():
action = random.choice(state.get_legal_actions())
state = state.apply_action(action)
return state.get_result()
def backpropagate(self, result):
"""回传结果"""
self.visits += 1
self.value_sum += result
if self.parent:
self.parent.backpropagate(1 - result)
def mcts_tictactoe(state, num_simulations=1000):
"""井字棋 MCTS 搜索"""
root = TicTacToeMCTSNode(state)
for _ in range(num_simulations):
node = root
# 1. 选择
while node.is_fully_expanded() and node.children:
node = node.select_child()
# 2. 扩展
if not node.is_fully_expanded():
node = node.expand()
# 3. 模拟
result = node.simulate()
# 4. 回传
node.backpropagate(result)
# 返回访问次数最多的动作
best_action = max(
root.children.items(),
key=lambda item: item[1].visits
)[0]
return best_action
# 测试:MCTS vs 随机玩家
def play_game():
"""完整对局"""
game = TicTacToe()
while not game.is_terminal():
game.display()
if game.current_player == 1:
# MCTS 玩家
action = mcts_tictactoe(game, num_simulations=1000)
print(f"MCTS 选择: {action}")
else:
# 随机玩家
action = random.choice(game.get_legal_actions())
print(f"随机选择: {action}")
game = game.apply_action(action)
game.display()
winner = game.get_winner()
if winner == 1:
print("MCTS 获胜!")
elif winner == -1:
print("随机玩家获胜!")
else:
print("平局!")
# play_game()
六、性能分析与调优
6.1 模拟次数 vs 棋力
def evaluate_strength(num_simulations_list, num_games=100):
"""评估不同模拟次数的棋力"""
results = {}
for num_sims in num_simulations_list:
wins = 0
for _ in range(num_games):
game = TicTacToe()
while not game.is_terminal():
if game.current_player == 1:
# MCTS 玩家
action = mcts_tictactoe(game, num_simulations=num_sims)
else:
# 对手:固定模拟次数(如100次)
action = mcts_tictactoe(game, num_simulations=100)
game = game.apply_action(action)
if game.get_winner() == 1:
wins += 1
winrate = wins / num_games
results[num_sims] = winrate
print(f"模拟次数={num_sims}: 胜率={winrate:.2%}")
return results
# 测试
# simulations = [10, 50, 100, 500, 1000]
# results = evaluate_strength(simulations, num_games=50)
6.2 探索常数 c 的影响
import matplotlib.pyplot as plt
def visualize_uct_exploration():
"""可视化 UCT 的探索-利用平衡"""
N = 100 # 父节点访问次数
c_values = [0.5, 1.0, 1.41, 2.0]
n_range = np.arange(1, N+1)
plt.figure(figsize=(12, 6))
for c in c_values:
exploration_bonus = c * np.sqrt(np.log(N) / n_range)
plt.plot(n_range, exploration_bonus, label=f'c={c}', linewidth=2)
plt.xlabel('节点访问次数 n')
plt.ylabel('探索奖励')
plt.title('UCT 探索项随访问次数的变化')
plt.legend()
plt.grid(alpha=0.3)
plt.show()
# visualize_uct_exploration()
七、总结与展望
核心要点
- MCTS 四步骤
- 选择(Selection):UCT 引导的树遍历
- 扩展(Expansion):添加新节点
- 模拟(Simulation):随机对局评估
- 回传(Backpropagation):更新统计信息
- 关键算法
- UCT:平衡探索与利用
- PUCT:结合神经网络先验
- RAVE:利用动作全局价值
- Virtual Loss:支持并行化
- 适用场景
- ✅ 分支因子大的博弈游戏
- ✅ 难以设计评估函数的问题
- ✅ 需要在线规划的场景
- ❌ 状态空间极小(可用 Minimax)
- ❌ 单次模拟成本极高
实践建议
参数调优:
num_simulations: 1000-10000(根据时间预算)c_puct: 1.0-2.0(通过实验确定)temperature: 训练初期用1.0,后期降到0.1
工程优化:
- 使用 C++/Cython 加速核心循环
- 并行化(虚拟损失 + Root Parallelization)
- 缓存重用(保留子树用于下一步)
- 提前终止(置信度足够高时)
与深度学习结合:
- 策略网络提供先验概率
- 价值网络替代随机模拟
- 自我对弈生成训练数据
前沿方向
- MuZero: 学习环境模型,无需游戏规则
- Gumbel AlphaZero: 改进的采样策略
- GPU MCTS: 利用 GPU 加速模拟
- 连续动作空间: 扩展到机器人控制
参考资源
经典论文:
- A Survey of Monte Carlo Tree Search Methods (Browne et al., 2012)
- Mastering the game of Go with deep neural networks and tree search (AlphaGo, 2016)
- Mastering Chess and Shogi by Self-Play with a General RL Algorithm (AlphaZero, 2017)
开源实现:
- python-mcts - 简洁的 Python 实现
- AlphaZero General - AlphaZero 框架
- KataGo - 围棋 AI(MCTS + 神经网络)
教程与可视化:
MCTS 是现代博弈AI的基石,掌握它将为你打开强化学习与游戏AI的大门!
💬 交流与讨论
⚠️ 尚未完成 Giscus 配置。请在
_config.yml中设置repo_id与category_id后重新部署,即可启用升级后的评论系统。配置完成后,评论区将自动支持 Markdown 代码高亮与 LaTeX 数学公式渲染,访客回复会同步到 GitHub Discussions,并具备通知功能。