Skip to content

s19 强化学习入门:MDP 与 Q-Learning — exercise.py 练习指南

Download exercise.py

练习目标

通过手写 Q-Learning 的三个核心组件,建立对强化学习中"试错学习"的深刻直觉。完成后你将能够:

  1. 独立写出 Q-Learning 的 TD 更新规则
  2. 实现 ε-贪婪策略的动作选择
  3. 理解动态环境(障碍物变化)对强化学习的影响

预备知识

  • Q-Learning 更新公式Q(s,a)Q(s,a)+α[r+γmaxaQ(s,a)Q(s,a)]
  • MDP 五元组M=(S,A,P,R,γ)
  • 探索 vs 利用ε-贪婪策略在随机探索和贪婪利用之间权衡
  • Model-Free:不需要知道环境模型 P(s|s,a)

任务清单

练习 1:实现 Q-Learning 的 TD 更新规则

目标:补全 q_learning_update() 函数,这是 Q-Learning 算法的核心。

核心公式

Q(s,a)Q(s,a)+α[r+γmaxaQ(s,a)Q(s,a)]TD 误差 δ

TODO 步骤

python
def q_learning_update(q_table, state_idx, action, reward,
                      next_state_idx, done, alpha=0.1, gamma=0.95):
    # 步骤 1: 获取当前 Q(s, a)
    current_q = q_table[state_idx, action]

    # 步骤 2: 计算 TD 目标
    if done:
        # 终止状态:未来价值为 0,TD 目标只有即时奖励
        td_target = reward
    else:
        # 非终止状态:即时奖励 + 打折后的最佳未来价值
        max_next_q = np.max(q_table[next_state_idx])   # max_{a'} Q(s', a')
        td_target = reward + gamma * max_next_q

    # 步骤 3: 计算 TD 误差
    td_error = td_target - current_q

    # 步骤 4: 更新 Q 值
    q_table[state_idx, action] = current_q + alpha * td_error
    return q_table

逐行详解

  1. current_q = q_table[state_idx, action]:当前对 (s,a) 的估值。初始时 Q 表全为零,Agent 什么都不知道。

  2. done 分支:如果下一状态是终止状态(终点/陷阱),未来没有更多奖励,TD 目标就是即时奖励 r。这个分支确保了到达终点时 (s,a) 的 Q 值直接反映终点奖励。

  3. max_next_q = np.max(q_table[next_state_idx]):取下一状态所有可能动作的 Q 值中的最大值。这体现了 Q-Learning 的 off-policy 特性——我们用最优策略来选择下一动作(argmax),但实际执行的动作由行为策略(ε-贪婪)决定。

  4. td_error = td_target - current_q:TD 误差告诉 Agent "你的估计偏离目标多少":

    • 正 TD 误差 δ>0:当前估计太低,需要提高
    • 负 TD 误差 δ<0:当前估计太高,需要降低
  5. q_table += alpha * td_error:朝 TD 目标的方向走一小步。步长由 α 控制。

验证测试:代码提供了两个测试用例——

测试 1 — 非终止状态

  • 初始 Q 表全零,state_idx=0, action=1, reward=5.0, alpha=0.1, gamma=0.9
  • 更新后:Q(0,1)=0+0.1×(5.0+0.9×00)=0.5

测试 2 — 终止状态

  • Q(1,0) 初始为 1.0, reward=10.0, done=True, alpha=0.5
  • 更新后:Q(1,0)=1.0+0.5×(10.01.0)=5.5

预期输出

TODO 1 测试: Q-Learning 的 TD 更新规则
  测试 1 [非终止状态]:
    更新前 Q(0,1) = 0.0
    更新后 Q(0,1) = 0.5000
    预期: 0.0 + 0.1 × (5.0 + 0.9 × 0 - 0.0) = 0.5
    ✓ 测试通过!
  测试 2 [终止状态]:
    更新前 Q(1,0) = 1.0
    更新后 Q(1,0) = 5.5000
    预期: 1.0 + 0.5 × (10.0 - 1.0) = 5.5
    ✓ 测试通过!

练习 2:实现 ε-贪婪动作选择

目标:补全 epsilon_greedy_action() 函数,实现探索与利用的权衡。

核心规则

at={随机动作以概率 εargmaxaQ(s,a)以概率 1ε

TODO 步骤

python
def epsilon_greedy_action(q_table, state_idx, epsilon):
    if np.random.random() < epsilon:
        # 探索:随机选择任一动作
        action = np.random.randint(q_table.shape[1])
    else:
        # 利用:选择 Q 值最大的动作
        # 注意:当多个动作 Q 值相同时,应随机选择一个(而非总是第一个)
        q_vals = q_table[state_idx]
        max_q = np.max(q_vals)
        best_actions = np.where(q_vals == max_q)[0]   # 所有最大值动作的索引
        action = np.random.choice(best_actions)         # 随机选一个
    return action

关键要点

  1. np.random.random() < epsilonrandom.random() 返回 [0, 1) 之间的均匀随机数。当它小于 ε 时进入探索分支。

  2. np.random.randint(q_table.shape[1]):从 [0,n_actions1] 之间均匀随机选择。所有动作被选中的概率相等——这是"无偏"探索。

  3. 最大值平局处理:当多个动作的 Q 值相同(都是最大值)时,np.argmax 只返回第一个——这会造成系统性偏差。使用 np.where(q_vals == max_q)[0] 找出所有最大值的索引,然后 np.random.choice 随机选一个。这确保当 Q 值相同时(如初始全零),探索分支仍能均匀覆盖所有动作。

验证测试

  • ε=0(纯利用):100 次选择应全部为最大 Q 值动作
  • ε=1(纯探索):500 次选择应覆盖全部 4 个动作
  • ε=0.5(混合):约 50% 利用(选动作 1 或 3),50% 探索(可能选动作 0 或 2)

预期输出

TODO 2 测试: ε-贪婪动作选择
  测试 1 [ε=0]: 100 次选择结果={1, 3}
    ✓ 测试通过! (全部为最大 Q 值动作)
  测试 2 [ε=1]: 500 次选择的动作集合={0, 1, 2, 3}
    ✓ 测试通过! (覆盖了全部 4 个动作)
  测试 3 [ε=0.5]: 利用=~500, 探索=~500

练习 3:添加动态环境特征——移动障碍物

目标:实现动态变化的障碍物机制,理解强化学习在非稳态环境中的挑战。

核心思想:训练过程中,环境的奖励函数/转移概率发生变化——Agent 不仅要学习最优策略,还要适应环境的改变。这更接近真实世界(市场变化、用户偏好变化等)。

TODO 步骤

python
def add_moving_obstacle(env, cur_episode):
    periodic_idx = cur_episode // 100     # 每 100 episode 更换一次配置

    obstacle_configs = [
        [(1, 1), (3, 3)],   # 配置 0: 对角线障碍
        [(1, 3), (3, 1)],   # 配置 1: 反对角线障碍
        [(2, 2)],            # 配置 2: 中心障碍
        [(1, 2), (2, 1), (3, 2)],  # 配置 3: 密集障碍
    ]

    idx = periodic_idx % len(obstacle_configs)
    return obstacle_configs[idx]

周期性变化的设计:每 100 个 episode 换一次障碍物布局。这创建了一个"适应-变化-再适应"的循环:

  • Episode 0-99:障碍在 (1,1) 和 (3,3)
  • Episode 100-199:障碍换到 (1,3) 和 (3,1)
  • Episode 200-299:障碍换到 (2,2)
  • Episode 300-399:障碍换到 (1,2), (2,1), (3,2)
  • Episode 400+:循环回到配置 0

对 Agent 的影响

  • 每次障碍物改变时,Agent 已学到的 Q 值可能不再适用
  • Agent 需要通过新的探索来更新 Q 值
  • 这测试了 Agent 的适应能力——能否快速从环境变化中恢复

障碍物设计原则

  1. 不能与起点 (0,0) 或终点 (4,4) 重合(否则 episode 无法正常开始或结束)
  2. 不同配置间有足够差异——测试 Agent 是否能适应不同的环境布局
  3. 配置 3 障碍物最多,路径选择最受限——最困难的场景

预期输出

TODO 3 测试: 动态障碍物
  Episode    0 → 障碍物: [(1, 1), (3, 3)]
  Episode   50 → 障碍物: [(1, 1), (3, 3)]
  Episode  100 → 障碍物: [(1, 3), (3, 1)]
  Episode  199 → 障碍物: [(1, 3), (3, 1)]
  Episode  300 → 障碍物: [(2, 2)]
  Episode  399 → 障碍物: [(2, 2)]
  ✓ 周期性正确! Episode 0 和 100 配置相同

三个练习的关系

练习Q-Learning 组件在算法中的位置
练习 1: TD 更新学习规则每一步交互后更新 Q 表
练习 2: ε-贪婪行为策略每次选择动作时调用
练习 3: 动态障碍环境设计改变 MDP 的 RP

这三个组件构成了 Q-Learning 的完整循环:用 ε-贪婪选择动作(练习 2)→ 执行动作并获得奖励 → 用 TD 更新改进 Q 表(练习 1)→ 面对动态变化的环境(练习 3)。

检查要点

运行 python exercise.py,确认:

  • [ ] 练习 1 两个测试用例均通过(非终止状态 0.5, 终止状态 5.5)
  • [ ] 练习 2 ε=0 只选最优动作,ε=1 覆盖所有动作
  • [ ] 练习 3 障碍物周期性正确,相同周期内配置一致

完成练习后,返回 demo.py 观察完整的 GridWorld 环境、Q-Learning Agent 和消融实验——这些都是对练习中三个核心概念的深入应用。

完整代码

py
# -*- coding: utf-8 -*-
"""
s19 强化学习入门:MDP 与 Q-Learning — 练习代码
================================================
请完成以下 TODO 任务,巩固对 Q-Learning 核心机制的理解。

每个 TODO 都有详细的指示和预期输出描述。
建议先阅读 README.md,再尝试独立补全代码。
运行方式:在 s19_rl_qlearning/ 目录下执行 python code/exercise.py
"""

import numpy as np
from typing import Tuple, List, Dict, Optional


# ============================================================================
# 辅助类:简化版 GridWorld(复用 demo 中的逻辑)
# ============================================================================

class MiniGridWorld:
    """
    简化版网格世界——用于练习。

    一个 5×5 网格,起点 (0,0),终点 (4,4),无陷阱。
    动作: 0=上, 1=下, 2=左, 3=右
    """

    def __init__(self):
        self.size = 5                                          # 5×5 小网格
        self.start = (0, 0)                                    # 起点
        self.goal = (4, 4)                                     # 终点
        self.state = self.start                                # 当前状态
        self.action_deltas = [(-1, 0), (1, 0), (0, -1), (0, 1)]  # 上/下/左/右
        self.action_names = ["上", "下", "左", "右"]

    def reset(self):
        """重置环境到起点。"""
        self.state = self.start
        return self.state

    def step(self, action: int) -> Tuple[Tuple[int, int], float, bool]:
        """
        执行动作。到达终点获得 +10 奖励,每步 -0.1。
        """
        dr, dc = self.action_deltas[action]
        new_r = self.state[0] + dr
        new_c = self.state[1] + dc
        if 0 <= new_r < self.size and 0 <= new_c < self.size:
            self.state = (new_r, new_c)
        if self.state == self.goal:
            return self.state, 10.0, True                       # 到达终点
        else:
            return self.state, -0.1, False                      # 普通步数

    def get_state_index(self, state: Tuple[int, int]) -> int:
        """状态 → 行优先索引。"""
        return state[0] * self.size + state[1]


# ============================================================================
# TODO 1: 实现 Q-Learning 的 TD 更新规则
# ============================================================================
def q_learning_update(
    q_table: np.ndarray,
    state_idx: int,
    action: int,
    reward: float,
    next_state_idx: int,
    done: bool,
    alpha: float = 0.1,
    gamma: float = 0.95,
) -> np.ndarray:
    """
    TODO 1: 实现 Q-Learning 的单步 TD 更新。

    更新公式:
        Q(s,a) ← Q(s,a) + α × (r + γ × max_{a'} Q(s',a') - Q(s,a))

    其中:
        - 如果 done=True(终止状态),则 max_{a'} Q(s',a') = 0
        - 否则,max_{a'} Q(s',a') 是下一状态 Q 值中最大的那个

    参数:
        q_table: Q 值表,shape (n_states, n_actions)
        state_idx: 当前状态索引 s
        action: 执行的动作 a
        reward: 获得的奖励 r
        next_state_idx: 下一状态索引 s'
        done: 是否到达终止状态
        alpha: 学习率 α
        gamma: 折扣因子 γ

    返回:
        q_table: 更新后的 Q 表(原地修改也需返回)
    """
    # TODO: 补全以下代码

    # 步骤 1: 获取当前 Q(s, a) 值
    current_q = None  # ← TODO: q_table[state_idx, action]

    # 步骤 2: 计算 TD 目标
    # 提示: 如果 done=True,TD 目标 = reward
    #       如果 done=False,TD 目标 = reward + gamma * max(Q[next_state_idx, :])
    if done:
        td_target = None  # ← TODO: 终止状态: TD 目标只有即时奖励
    else:
        max_next_q = None  # ← TODO: np.max(q_table[next_state_idx])
        td_target = None   # ← TODO: reward + gamma * max_next_q

    # 步骤 3: 计算 TD 误差 δ = td_target - current_q
    td_error = None  # ← TODO: td_target - current_q

    # 步骤 4: 更新 Q(s, a)
    # q_table[state_idx, action] += alpha * td_error
    q_table[state_idx, action] = None  # ← TODO: 实现更新

    return q_table


# ---- 测试 TODO 1 ----
def test_q_update():
    """测试 Q-Learning 更新规则。"""
    print("=" * 60)
    print("TODO 1 测试: Q-Learning 的 TD 更新规则")
    print("=" * 60)

    # 创建一个小型 Q 表: 3 个状态,2 个动作
    n_states, n_actions = 3, 2
    q_table = np.zeros((n_states, n_actions))

    # 测试 1: 非终止状态下的更新
    # 状态 0 执行动作 1 → 到达状态 1,奖励 +5
    q_before = q_table[0, 1].copy()
    q_table = q_learning_update(
        q_table, state_idx=0, action=1, reward=5.0,
        next_state_idx=1, done=False, alpha=0.1, gamma=0.9
    )

    if q_table[0, 1] != 0.0:
        print(f"  测试 1 [非终止状态]:")
        print(f"    更新前 Q(0,1) = {q_before}")
        print(f"    更新后 Q(0,1) = {q_table[0,1]:.4f}")
        print(f"    预期: 0.0 + 0.1 × (5.0 + 0.9 × 0 - 0.0) = 0.5")
        if abs(q_table[0, 1] - 0.5) < 0.001:
            print(f"    ✓ 测试通过!")
        else:
            print(f"    ✗ 测试失败: 期望 0.5, 得到 {q_table[0,1]:.4f}")
    else:
        print("  TODO 未完成,请补全 q_learning_update 函数")

    # 测试 2: 终止状态下的更新
    q_table = np.zeros((n_states, n_actions))                  # 重置 Q 表
    q_table[1, 0] = 1.0                                        # 设置一个初始值
    q_table = q_learning_update(
        q_table, state_idx=1, action=0, reward=10.0,
        next_state_idx=2, done=True, alpha=0.5, gamma=0.9
    )

    if q_table[1, 0] != 1.0:
        print(f"  测试 2 [终止状态]:")
        print(f"    更新前 Q(1,0) = 1.0")
        print(f"    更新后 Q(1,0) = {q_table[1,0]:.4f}")
        # Q = 1.0 + 0.5 × (10.0 - 1.0) = 5.5
        print(f"    预期: 1.0 + 0.5 × (10.0 - 1.0) = 5.5")
        if abs(q_table[1, 0] - 5.5) < 0.001:
            print(f"    ✓ 测试通过!")
        else:
            print(f"    ✗ 测试失败: 期望 5.5, 得到 {q_table[1,0]:.4f}")
    else:
        print("  TODO 未完成,请补全 q_learning_update 函数 (done=True 分支)")

    print()


# ============================================================================
# TODO 2: 实现 ε-贪婪动作选择
# ============================================================================
def epsilon_greedy_action(
    q_table: np.ndarray,
    state_idx: int,
    epsilon: float,
) -> int:
    """
    TODO 2: 实现 ε-贪婪策略的动作选择。

    规则:
        - 以概率 ε: 随机选择一个动作(探索)
        - 以概率 1-ε: 选择 Q 值最大的动作(利用)
        - 当多个动作 Q 值相等时,随机选择其中一个

    参数:
        q_table: Q 值表,shape (n_states, n_actions)
        state_idx: 当前状态索引
        epsilon: 探索率 (0 ≤ ε ≤ 1)

    返回:
        action: 选择的动作索引
    """
    # TODO: 补全以下代码

    # 提示: 使用 np.random.random() 生成 [0,1) 之间的随机数
    # 如果 random < epsilon: 随机选择动作
    # 否则: 选择 argmax Q(s, :)
    # 提示: 当多个动作 Q 值相同时,用 np.argmax 只返回第一个,
    #       可以用 np.where(q_vals == q_vals.max())[0] 获取所有最大值,再随机选

    action = None  # ← TODO: 实现 ε-贪婪动作选择
    return action


# ---- 测试 TODO 2 ----
def test_epsilon_greedy():
    """测试 ε-贪婪动作选择。"""
    print("=" * 60)
    print("TODO 2 测试: ε-贪婪动作选择")
    print("=" * 60)

    # 创建 Q 表: 1 个状态,4 个动作
    q_table = np.array([[1.0, 5.0, 1.0, 5.0]])                # 动作 1 和 3 都是最大值

    # 测试 1: ε=0 → 100% 利用,应该选动作 1 或 3
    if epsilon_greedy_action(q_table, 0, 0.0) is None:
        print("  TODO 未完成,请补全 epsilon_greedy_action 函数")
    else:
        actions_chosen = [epsilon_greedy_action(q_table, 0, 0.0)
                         for _ in range(100)]
        unique = set(actions_chosen)
        if unique == {1, 3} or unique == {1} or unique == {3}:
            print(f"  测试 1 [ε=0, 纯利用]: 100 次选择结果={sorted(unique)}")
            print(f"    ✓ 测试通过! (全部为最大 Q 值动作: 1 和/或 3)")
        else:
            print(f"  测试 1 [ε=0, 纯利用]: 100 次选择结果={sorted(unique)}")
            print(f"    ✗ 测试失败: 含非最大 Q 值动作")

        # 测试 2: ε=1 → 100% 探索,应该覆盖所有 4 个动作
        actions_chosen = [epsilon_greedy_action(q_table, 0, 1.0)
                         for _ in range(500)]
        unique = set(actions_chosen)
        if unique == {0, 1, 2, 3}:
            print(f"  测试 2 [ε=1, 纯探索]: 500 次选择的动作集合={sorted(unique)}")
            print(f"    ✓ 测试通过! (覆盖了全部 4 个动作)")
        else:
            print(f"  测试 2 [ε=1, 纯探索]: 500 次选择的动作集合={sorted(unique)}")
            print(f"    ✗ 测试失败: 未覆盖全部动作")

        # 测试 3: ε=0.5 → 混合,约 50% 探索 + 50% 利用
        actions_chosen = [epsilon_greedy_action(q_table, 0, 0.5)
                         for _ in range(1000)]
        exploit_count = sum(1 for a in actions_chosen if a in [1, 3])  # 利用动作
        explore_count = sum(1 for a in actions_chosen if a in [0, 2])  # 探索动作
        print(f"  测试 3 [ε=0.5, 混合]: 利用={exploit_count}, 探索={explore_count}")
        print(f"    (预期约为 500 探索 + 500 利用)")

    print()


# ============================================================================
# TODO 3: 添加新环境特征——动态障碍/陷阱
# ============================================================================
def add_moving_obstacle(
    env: MiniGridWorld,
    cur_episode: int,
):
    """
    TODO 3: 为 MiniGridWorld 添加动态变化的障碍物。

    任务:
        创建一个"移动障碍"机制,障碍物的位置随 episode 编号变化,
        而不是固定在某个位置。Agent 踩到障碍物时获得 -5 的惩罚。

    具体要求:
    1. 设计一个函数,根据 cur_episode(当前 episode 编号)返回障碍物位置列表
    2. 障碍物位置随 episode 周期性地改变(例如每 100 个 episode 换一个位置)
    3. 在 step() 函数中添加障碍物检测逻辑
    4. 修改训练循环,每 episode 开始时调用此函数更新障碍

    参数:
        env: MiniGridWorld 环境实例
        cur_episode: 当前 episode 编号

    返回:
        obstacle_positions: 当前 episode 的障碍物位置列表 [(row,col), ...]
    """
    # TODO: 实现动态障碍机制
    # 提示 1: 使用 cur_episode // 100 作为周期索引
    # 提示 2: 定义几个障碍物位置候选集,根据周期索引选择不同的集合
    # 提示 3: 障碍物不能与起点 (0,0) 或终点 (4,4) 重合

    periodic_idx = cur_episode // 100                           # 每 100 episode 更换

    # 定义几个障碍物配置(不覆盖起点和终点)
    obstacle_configs = [
        [(1, 1), (3, 3)],                                       # 配置 0
        [(1, 3), (3, 1)],                                       # 配置 1
        [(2, 2)],                                                # 配置 2
        [(1, 2), (2, 1), (3, 2)],                               # 配置 3
    ]

    # TODO: 根据 periodic_idx 选择障碍物配置
    # idx = periodic_idx % len(obstacle_configs)
    # return obstacle_configs[idx]

    obstacles = None  # ← TODO: 返回对应周期的障碍物列表
    return obstacles


# ---- 测试 TODO 3 ----
def test_moving_obstacle():
    """测试动态障碍功能。"""
    print("=" * 60)
    print("TODO 3 测试: 动态障碍物")
    print("=" * 60)

    env = MiniGridWorld()

    # 测试不同 episode 下障碍物是否变化
    test_episodes = [0, 50, 100, 199, 300, 399]
    results = []
    for ep in test_episodes:
        obs = add_moving_obstacle(env, ep)
        results.append((ep, obs))

    if results[0][1] is None:
        print("  TODO 未完成,请补全 add_moving_obstacle 函数")
        return

    for ep, obs in results:
        if obs:
            print(f"  Episode {ep:4d} → 障碍物: {obs}")
        else:
            print(f"  Episode {ep:4d} → 障碍物: [] (无障碍)")

    # 验证周期性——episodes 0 和 100 应该有相同配置(同属周期 0)
    if results[0][1] == results[2][1]:
        print("  ✓ 周期性正确! Episode 0 和 100 配置相同")
    else:
        print("  ✗ 周期性可能有问题: Episode 0 和 100 的配置不同")

    print()


# ============================================================================
# 主程序
# ============================================================================
if __name__ == "__main__":
    print("\n╔══════════════════════════════════════════════════════════════╗")
    print("║   s19 强化学习入门:MDP 与 Q-Learning — 动手练习            ║")
    print("║   请依次完成 TODO 1, 2, 3                                    ║")
    print("╚══════════════════════════════════════════════════════════════╝\n")

    test_q_update()
    test_epsilon_greedy()
    test_moving_obstacle()

    print("=" * 60)
    print("所有测试完成!请检查输出结果。")
    print("如有未通过的测试,请回到对应的 TODO 部分补全代码。")
    print("=" * 60)
    print()
    print("提示: 完成 TODO 后,可以回到 demo.py 查看完整实现作为参考。")
    print()