Skip to content

s07 多层网络的矩阵反向传播 — demo.py 代码详解

Download demo.py

运行方式

bash
cd s07_matrix_backprop/code
python demo.py

代码逐段详解

第1步:导入库 — 每个库是做什么的

python
import numpy as np
import matplotlib.pyplot as plt
from typing import Dict, List, Tuple, Callable
import os
  • numpy:科学计算核心库。提供多维数组(ndarray)、矩阵乘法(@)、随机数生成等。整个 MLP 的参数存储、前向计算、梯度计算全基于 NumPy。
  • matplotlib:可视化库。用于绘制决策边界、损失曲线、梯度范数变化、权重热力图。
  • typing:类型注解。Dict 用于参数字典和梯度的类型提示,List 用于每层维度的列表,Tuple 用于多返回值的类型注解,Callable 用于激活函数类型。

从标量到矩阵的跃迁:上一节 s06 的 Value 类是标量级别的自动微分,每个数字都是独立的节点。本节直接用 NumPy 矩阵批量操作,一个 ndarray 对象代表一整层神经元的输出。


第2步:激活函数及其导数 — 逐元素操作

每个激活函数都需要实现两个版本:前向函数φ(x))和导数函数φ'(x)),因为 δ 递推公式中需要 ϕ(Z[l])

ReLU

python
def relu(Z: np.ndarray) -> np.ndarray:
    return np.maximum(0, Z)

def relu_derivative(Z: np.ndarray) -> np.ndarray:
    return (Z > 0).astype(np.float64)

数学定义:

ReLU(Z)=max(0,Z),ReLU(Z)=1[Z>0]

np.maximum(0, Z) 逐元素比较,返回每个位置的非负值。(Z > 0) 生成布尔数组,.astype(np.float64) 将 True/False 转为 1.0/0.0——这是一个高效的逐元素"门控"实现。

Sigmoid

python
def sigmoid(Z: np.ndarray) -> np.ndarray:
    Z_clipped = np.clip(Z, -500, 500)   # 防止 exp 溢出
    return 1.0 / (1.0 + np.exp(-Z_clipped))

def sigmoid_derivative(Z: np.ndarray) -> np.ndarray:
    s = sigmoid(Z)
    return s * (1.0 - s)

数学公式:

σ(Z)=11+eZ,σ(Z)=σ(Z)(1σ(Z))

np.clip(Z, -500, 500) 是一个关键的数值稳定技巧:当 Z 是很大的负数时,eZ 会溢出浮点数范围(如 e80010347,远超 float64 上限)。将 Z 截断到 [500,500] 完全不影响结果(因为 σ(500)0σ(500)1),但彻底避免了溢出。

Tanh

python
def tanh(Z: np.ndarray) -> np.ndarray:
    return np.tanh(Z)

def tanh_derivative(Z: np.ndarray) -> np.ndarray:
    return 1.0 - np.tanh(Z) ** 2
tanh(Z)=1tanh2(Z)

激活函数注册表

python
ACTIVATION_REGISTRY = {
    "relu": (relu, relu_derivative),
    "sigmoid": (sigmoid, sigmoid_derivative),
    "tanh": (tanh, tanh_derivative),
    "linear": (lambda Z: Z, lambda Z: np.ones_like(Z)),
}

这个字典提供了一个工厂模式:通过字符串名称即可获取对应激活函数及其导数。linear 激活(恒等映射)的导数是全1矩阵——因为 xx=1


第3步:MLP 类的参数初始化

python
class MLP:
    def __init__(self, layer_dims: List[int], activations: List[str], seed: int = 42):
        for l in range(1, self.L + 1):
            n_in = layer_dims[l - 1]
            n_out = layer_dims[l]
            self.parameters[f"W{l}"] = np.random.randn(n_out, n_in) * np.sqrt(2.0 / n_in)
            self.parameters[f"b{l}"] = np.zeros((n_out, 1))

He 初始化

W[l]N(0,2nin)

为什么是 2/nin?He 初始化(Kaiming He, 2015)专门为 ReLU 激活函数设计。在前向传播中,如果权重方差为 2/nin,那么经过 ReLU 后输出的方差大约能保持稳定,不会逐层衰减或放大。这对深层网络的训练至关重要。

偏置统一初始化为零——偏置的初始值对梯度流的影响远小于权重,因为偏置只做平移,不参与乘法。

参数量统计:对于 [2,16,8,1] 的网络:

  • W1: 16×2=32 参数
  • b1: 16×1=16 参数
  • W2: 8×16=128 参数
  • b2: 8×1=8 参数
  • W3: 1×8=8 参数
  • b3: 1×1=1 参数
  • 总计:193 个参数(代码输出约为 203,因为最后一层也包含偏置)

第4步:前向传播 — 逐层计算 + 缓存中间值

python
def forward(self, X: np.ndarray) -> np.ndarray:
    self.caches = []
    A = X  # A[0] = X

    for l in range(1, self.L + 1):
        A_prev = A
        W = self.parameters[f"W{l}"]
        b = self.parameters[f"b{l}"]
        Z = W @ A_prev + b           # 线性变换 Z[l] = W[l] @ A[l-1] + b[l]
        act_fn, _ = ACTIVATION_REGISTRY[self.activations[l - 1]]
        A = act_fn(Z)                 # 非线性激活 A[l] = φ(Z[l])

        self.caches.append({
            "Z": Z,                   # 用于计算 φ'(Z[l])
            "A_prev": A_prev,         # 用于计算 dW[l] = δ @ (A_prev)^T
            "A": A,                   # 当前层输出(即下一层的输入)
        })

    return A

数学对应:

Z[l]=W[l]A[l1]+b[l]A[l]=ϕ[l](Z[l])

其中 A[0]=X 是输入数据。

缓存的三个值及其用途

缓存字段存储内容反向传播中的用途
Z线性输出 Z[l]计算 ϕ(Z[l])
A_prev上一层激活 A[l1]计算 dW[l]=δ[l](A[l1])T
A当前层激活 A[l]下一层前向传播的输入(即 A[l] 作为下一层的 Aprev

维度说明(以 W:(16,2),输入 (2,300) 为例):

  • Z=W@Aprev+b(16,2)@(2,300)+(16,1)=(16,300)(广播机制自动扩展 b
  • A=ReLU(Z):保持 (16,300)

第5步:反向传播 — δ 递推公式的核心实现

这是本节最关键的代码。完整实现了从输出层到输入层的梯度逆传。

python
def backward(self, Y: np.ndarray) -> Dict[str, np.ndarray]:
    m = Y.shape[1]
    self.grads = {}

    # ---- 步骤 1: 输出层的 δ[L] ----
    AL = self.caches[-1]["A"]                     # 预测值
    ZL = self.caches[-1]["Z"]                     # 输出层线性输出
    _, act_prime_fn = ACTIVATION_REGISTRY[self.activations[-1]]
    dAL = (1.0 / m) * (AL - Y)                     # ∂L/∂A[L]
    dZ = dAL * act_prime_fn(ZL)                    # δ[L] = ∇_A L ⊙ φ'(Z[L])

输出层 δ 的计算分为两步:

  1. 损失对激活的梯度(MSE 损失):
LA[L]=1m(A[L]Y)

其中 L=12m(A[L]Y)2。除以 m 是为了对 mini-batch 取平均——梯度是所有样本梯度的均值。

  1. 乘以激活函数的导数(链式法则):
δ[L]=LA[L]ϕ(Z[L])

注意:这是逐元素相乘*,Hadamard 积 ),不是矩阵乘法(@)。


隐藏层的 δ 递推

python
for l in reversed(range(1, self.L + 1)):
    cache = self.caches[l - 1]
    A_prev = cache["A_prev"]

    # 参数梯度
    self.grads[f"dW{l}"] = dZ @ A_prev.T
    self.grads[f"db{l}"] = np.sum(dZ, axis=1, keepdims=True)

    # 继续向前递推 δ
    if l > 1:
        W_next = self.parameters[f"W{l}"]
        Z_prev = self.caches[l - 2]["Z"]
        _, act_prime_fn_prev = ACTIVATION_REGISTRY[self.activations[l - 2]]
        dZ = (W_next.T @ dZ) * act_prime_fn_prev(Z_prev)

三步走,对应三个核心公式:

1. 权重梯度(外积)

LW[l]=δ[l](A[l1])T

注意:代码中没有显式除以 m,因为 dZ(即 δ[l])在输出层已经包含了 1m 因子。

维度验证δ[l] 形状 (n[l],m)(A[l1])T 形状 (m,n[l1]),乘积 (n[l],n[l1]) —— 与 W[l] 形状完全一致。

2. 偏置梯度

Lb[l]=1mi=1mδi[l]

np.sum(dZ, axis=1, keepdims=True) 对 300 个样本的误差信号按行求和,形状从 (n[l],m) 变为 (n[l],1)

3. δ 递推(核心)

δ[l1]=(W[l])Tδ[l]ϕ(Z[l1])
  • W_next.T @ dZ:将第 l 层的误差通过转置权重回传——这是"责任分配"的数学体现
  • * act_prime_fn_prev(Z_prev):经过激活函数的导数门控
  • 这个递推关系是反向传播的灵魂——一旦 δ[L] 被算出,所有层的 δ 和参数梯度都能通过统一的公式自动推导

第6步:参数更新 — 梯度下降

python
def update(self, learning_rate: float):
    for l in range(1, self.L + 1):
        self.parameters[f"W{l}"] -= learning_rate * self.grads[f"dW{l}"]
        self.parameters[f"b{l}"] -= learning_rate * self.grads[f"db{l}"]

标准梯度下降一步:

W[l]:=W[l]αLW[l]

注意这里没有使用任何优化器技巧(Momentum、Adam 等)——那是下一节 s08 的主题。


第7步:梯度检查 — 用有限差分验证解析梯度

梯度检查是手写反向传播时的黄金调试标准。本节使用双边有限差分法

python
def gradient_check(model, X, Y, epsilon=1e-7):
    # 对每个参数的每个元素逐一计算数值梯度
    for idx in ...:  # 遍历参数矩阵每个位置
        original_value = param[idx]

        param[idx] = original_value + epsilon
        loss_plus = model.compute_loss(model.forward(X), Y)

        param[idx] = original_value - epsilon
        loss_minus = model.compute_loss(model.forward(X), Y)

        grad_numeric[idx] = (loss_plus - loss_minus) / (2.0 * epsilon)
        param[idx] = original_value  # 恢复原始值

数学公式:

LθL(θ+ϵ)L(θϵ)2ϵ

为什么用双边差分而非单边? 单边差分 L(θ+ϵ)L(θ)ϵ 的误差是 O(ϵ),而双边差分的误差是 O(ϵ2),精度高得多。

相对误差公式

relative error=gradanalyticgradnumeric2gradanalytic2+gradnumeric2

解读标准:

  • <107:正确
  • 105:可能有小问题
  • >103:几乎肯定有 bug

重要警告:梯度检查极其缓慢——每个参数需要两次额外的前向传播。对于 200 个参数,需要用 400 次前向传播来验证一次梯度。这就是为什么实际训练中不使用梯度检查,只在开发验证时对极小网络+小 batch 使用。


第8步:数据生成 — 双月形二分类数据集

python
def make_moons_dataset(n_samples=200, noise=0.15, seed=0):
    # 上半月(类别 0):沿单位圆上半部分分布
    t = np.linspace(0, np.pi, n_samples_per_class)
    X0 = np.vstack([
        np.cos(t) + randn * noise,   # x 坐标 = cos(角度) + 噪声
        np.sin(t) + randn * noise,   # y 坐标 = sin(角度) + 噪声
    ])

    # 下半月(类别 1):偏移后的下半圆
    X1 = np.vstack([
        1 - np.cos(t) + randn * noise,
        1 - np.sin(t) - 0.5 + randn * noise,
    ])

双月形数据是一个经典的非线性二分类问题——一条直线无法分开两个类别,必须用非线性决策边界。这正好展示了 MLP(带 ReLU 隐藏层)的非线性表达能力的必要性。


第9步:可视化组件

决策边界可视化

python
def plot_decision_boundary(model, X, Y, title, filename):
    # 生成网格点
    xx, yy = np.meshgrid(np.linspace(x_min, x_max, 200),
                         np.linspace(y_min, y_max, 200))
    grid = np.vstack([xx.ravel(), yy.ravel()])
    Z = model.forward(grid)              # 模型对每个网格点的预测概率
    Z = Z.reshape(xx.shape)

    plt.contourf(xx, yy, Z, levels=[0, 0.5, 1])  # 填充决策区域
    plt.contour(xx, yy, Z, levels=[0.5])          # 画 p=0.5 的决策线

contourf 用不同颜色填充预测概率 <0.5>0.5 的区域,contourp=0.5 处画一条黑线——这就是模型的决策边界。训练前后各画一张,直观看到模型从"随机划分"到"学会分开两个月形"的过程。

权重热力图

python
axes[l, 0].imshow(model_before.parameters[f"W{l+1}"], cmap='RdBu_r')
axes[l, 1].imshow(model_after.parameters[f"W{l+1}"], cmap='RdBu_r')

imshow 将权重矩阵以颜色编码显示——红色代表正值权重,蓝色代表负值。训练前的权重是随机均匀的杂色,训练后的权重呈现有规律的模式,说明网络学到了有意义的结构。


第10步:训练循环 — 将一切串联

python
for epoch in range(n_epochs):
    Y_pred = model.forward(X)       # ① 前向传播
    loss = model.compute_loss(Y_pred, Y)  # ② 计算损失
    model.backward(Y)               # ③ 反向传播(计算梯度)
    model.update(learning_rate)     # ④ 参数更新

这就是深度学习训练的四步循环,在每一个 epoch 中重复:

  1. 前向:数据从输入流到输出
  2. 损失:量化预测与真实标签的差距
  3. 反向:从损失出发,梯度逆流回每个参数
  4. 更新:参数沿梯度反方向移动一步

关键概念速查表

概念数学公式代码实现
δ[L] 起手式ALϕ(Z[L])dAL = (1/m)*(AL - Y)dZ = dAL * act_prime_fn(ZL)
δ[l] 递推(W[l+1])Tδ[l+1]ϕ(Z[l])(W_next.T @ dZ) * act_prime_fn_prev(Z_prev)
权重梯度1mδ[l](A[l1])TdZ @ A_prev.T(外积)
偏置梯度1miδi[l]np.sum(dZ, axis=1, keepdims=True)
He 初始化WN(0,2/nin)randn * sqrt(2.0 / n_in)
梯度检查L(θ+ϵ)L(θϵ)2ϵ双边有限差分
梯度范数|g|2=gi2np.linalg.norm(grad)

完整代码

py
# -*- coding: utf-8 -*-
"""
s07 多层网络的矩阵反传 — 演示代码
==================================
功能:用纯 NumPy 实现完整的 MLP(前向 + 矩阵反向传播 + 参数更新),
      包括梯度检查、梯度范数监控、决策边界可视化和训练曲线。

运行方式:在 s07_matrix_backprop/ 目录下执行 python code/demo.py
"""

import numpy as np
import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams['axes.unicode_minus'] = False
from typing import Dict, List, Tuple, Callable
import os

_HERE = os.path.dirname(os.path.abspath(__file__))  # demo.py 所在目录
_IMAGES = os.path.join(_HERE, '..', 'images')        # 章节 images/ 目录
os.makedirs(_IMAGES, exist_ok=True)

# ============================================================================
# 第一部分:激活函数及其导数
# ============================================================================

def relu(Z: np.ndarray) -> np.ndarray:
    """ReLU 激活函数: max(0, Z)"""
    return np.maximum(0, Z)


def relu_derivative(Z: np.ndarray) -> np.ndarray:
    """ReLU 的导数: 1 if Z > 0 else 0"""
    return (Z > 0).astype(np.float64)


def sigmoid(Z: np.ndarray) -> np.ndarray:
    """Sigmoid 激活函数: 1 / (1 + e^{-Z})"""
    Z_clipped = np.clip(Z, -500, 500)  # 防止 exp 溢出
    return 1.0 / (1.0 + np.exp(-Z_clipped))


def sigmoid_derivative(Z: np.ndarray) -> np.ndarray:
    """Sigmoid 的导数: σ(Z) * (1 - σ(Z))"""
    s = sigmoid(Z)
    return s * (1.0 - s)


def tanh(Z: np.ndarray) -> np.ndarray:
    """Tanh 激活函数"""
    return np.tanh(Z)


def tanh_derivative(Z: np.ndarray) -> np.ndarray:
    """Tanh 的导数: 1 - tanh^2(Z)"""
    return 1.0 - np.tanh(Z) ** 2


# 激活函数注册表:方便按字符串名称查找
ACTIVATION_REGISTRY = {
    "relu": (relu, relu_derivative),
    "sigmoid": (sigmoid, sigmoid_derivative),
    "tanh": (tanh, tanh_derivative),
    "linear": (lambda Z: Z, lambda Z: np.ones_like(Z)),  # 线性激活(恒等映射)
}


# ============================================================================
# 第二部分:MLP 类 —— 完整的正向 + 反向传播
# ============================================================================

class MLP:
    """
    多层感知机,使用矩阵形式的反向传播。

    支持任意层数、任意激活函数,以及 mini-batch 训练。

    参数:
        layer_dims: 每层神经元数量,如 [2, 8, 4, 1]
        activations: 每层激活函数名称列表,如 ["relu", "relu", "sigmoid"]
        seed: 随机种子
    """

    def __init__(self, layer_dims: List[int], activations: List[str], seed: int = 42):
        """初始化网络参数(He 初始化)"""
        np.random.seed(seed)
        self.L = len(layer_dims) - 1  # 网络层数(不含输入层)
        self.activations = activations  # 每层的激活函数名称
        self.parameters = {}           # 参数字典: W1, b1, W2, b2, ...
        self.caches = []               # 前向传播缓存列表(每层一个 dict)

        for l in range(1, self.L + 1):
            n_in = layer_dims[l - 1]    # 输入维度
            n_out = layer_dims[l]       # 输出维度
            # He 初始化:W ~ N(0, sqrt(2/n_in)),特别适合配合 ReLU
            self.parameters[f"W{l}"] = np.random.randn(n_out, n_in) * np.sqrt(2.0 / n_in)
            self.parameters[f"b{l}"] = np.zeros((n_out, 1))  # 偏置零初始化

        self.grads = {}  # 存储每层参数梯度的字典

    def forward(self, X: np.ndarray) -> np.ndarray:
        """
        前向传播:计算模型输出,同时缓存中间值供反向传播使用。

        参数:
            X: 输入数据,shape (n_features, m_samples)

        返回:
            A[L]: 最后一层的激活输出,即模型的预测值
        """
        self.caches = []  # 清空缓存
        A = X             # A[0] = X(输入就是第 0 层的激活)

        for l in range(1, self.L + 1):
            A_prev = A                                        # 保存上一层激活值
            W = self.parameters[f"W{l}"]                      # 获取权重矩阵
            b = self.parameters[f"b{l}"]                      # 获取偏置向量
            Z = W @ A_prev + b                                 # 线性变换: Z[l] = W[l] @ A[l-1] + b[l]

            # 获取激活函数及其导数函数
            act_fn, _ = ACTIVATION_REGISTRY[self.activations[l - 1]]
            A = act_fn(Z)                                      # 非线性激活: A[l] = φ[l](Z[l])

            # 将中间值存入缓存
            self.caches.append({
                "Z": Z,          # Z[l] —— 反向传播中计算 φ'(Z[l]) 时需要
                "A_prev": A_prev,  # A[l-1] —— 反向传播中计算 dW[l] 时需要
                "A": A,          # A[l] —— 当前层输出,同时是下一层的输入
            })

        return A  # 返回最后一层的激活(模型预测值)

    def backward(self, Y: np.ndarray) -> Dict[str, np.ndarray]:
        """
        反向传播:使用矩阵形式的 δ 递推公式计算所有参数的梯度。

        核心公式:
          δ[L] = ∇_A L ⊙ φ'(Z[L])                             (输出层)
          δ[l] = (W[l+1])^T @ δ[l+1] ⊙ φ'(Z[l])              (隐藏层递推)
          dW[l] = (1/m) · δ[l] @ (A[l-1])^T                   (权重梯度)
          db[l] = (1/m) · sum(δ[l], axis=1, keepdims=True)    (偏置梯度)

        参数:
            Y: 标签,shape (n_output, m_samples),与最后一层输出 shape 一致

        返回:
            grads: 字典,包含每层的 dW{l} 和 db{l}
        """
        m = Y.shape[1]  # mini-batch 大小
        self.grads = {}  # 清空梯度字典

        # ---- 步骤 1: 输出层的 δ[L] ----
        # 损失函数使用 MSE: L = (1/(2m)) * Σ(A[L] - Y)²
        # ∂L/∂A[L] = (1/m) * (A[L] - Y)
        AL = self.caches[-1]["A"]                    # 最后一层的激活(预测值)
        ZL = self.caches[-1]["Z"]                    # 最后一层的线性输出
        _, act_prime_fn = ACTIVATION_REGISTRY[self.activations[-1]]  # 输出层激活函数的导数
        dAL = (1.0 / m) * (AL - Y)                   # ∂L/∂A[L] —— MSE 损失的梯度
        dZ = dAL * act_prime_fn(ZL)                  # δ[L] = ∇_A L ⊙ φ'(Z[L])

        # ---- 步骤 2: 隐藏层的 δ 递推 ----
        for l in reversed(range(1, self.L + 1)):
            cache = self.caches[l - 1]               # 第 l 层的缓存
            A_prev = cache["A_prev"]                 # A[l-1]

            # 计算参数梯度
            self.grads[f"dW{l}"] = dZ @ A_prev.T    # dW[l] = δ[l] @ (A[l-1])^T
            # 注意:这里已经在 dZ 中包含了 (1/m) 因子
            self.grads[f"db{l}"] = np.sum(dZ, axis=1, keepdims=True)  # db[l] = Σ_i δ_i[l]

            # 如果不是第一层,继续向前传播 δ
            if l > 1:
                W_next = self.parameters[f"W{l}"]    # W[l](当前层权重——从前一层角度看是 W[l+1])
                Z_prev = self.caches[l - 2]["Z"]     # Z[l-1]
                _, act_prime_fn_prev = ACTIVATION_REGISTRY[self.activations[l - 2]]
                # δ[l-1] = (W[l])^T @ δ[l] ⊙ φ'[l-1](Z[l-1])
                dZ = (W_next.T @ dZ) * act_prime_fn_prev(Z_prev)

        return self.grads

    def update(self, learning_rate: float):
        """
        使用梯度下降更新所有参数。

        W[l] := W[l] - α · dW[l]
        b[l] := b[l] - α · db[l]

        参数:
            learning_rate: 学习率 α
        """
        for l in range(1, self.L + 1):
            self.parameters[f"W{l}"] -= learning_rate * self.grads[f"dW{l}"]
            self.parameters[f"b{l}"] -= learning_rate * self.grads[f"db{l}"]

    def compute_loss(self, Y_pred: np.ndarray, Y_true: np.ndarray) -> float:
        """
        计算 MSE 损失。

        参数:
            Y_pred: 模型预测值
            Y_true: 真实标签

        返回:
            MSE 损失值(标量)
        """
        m = Y_true.shape[1]  # 样本数
        return np.mean((Y_pred - Y_true) ** 2) / 2.0  # (1/2) * MSE

    def get_gradient_norms(self) -> Dict[str, float]:
        """
        计算每层参数梯度的 L2 范数,用于监控训练健康度。

        返回:
            norms: 字典,key 为参数名,value 为梯度 L2 范数
        """
        norms = {}
        for l in range(1, self.L + 1):
            norms[f"|dW{l}|"] = np.linalg.norm(self.grads[f"dW{l}"])
            norms[f"|db{l}|"] = np.linalg.norm(self.grads[f"db{l}"])
        return norms


# ============================================================================
# 第三部分:梯度检查
# ============================================================================

def gradient_check(model: MLP, X: np.ndarray, Y: np.ndarray, epsilon: float = 1e-7) -> float:
    """
    使用双边有限差分法验证解析梯度。

    对每个参数 θ,数值梯度近似为:
        ∂L/∂θ ≈ (L(θ+ε) - L(θ-ε)) / (2ε)

    参数:
        model: MLP 模型
        X: 输入数据(使用小 batch 以提高速度)
        Y: 标签
        epsilon: 微小扰动

    返回:
        max_rel_error: 最大相对误差
    """
    # 先计算解析梯度
    Y_pred = model.forward(X)
    model.backward(Y)

    max_rel_error = 0.0  # 记录最大相对误差

    for l in range(1, model.L + 1):
        for param_name in [f"W{l}", f"b{l}"]:
            param = model.parameters[param_name]     # 原始参数矩阵
            grad_analytic = model.grads[f"d{param_name}"]  # 解析梯度
            grad_numeric = np.zeros_like(param)          # 数值梯度矩阵

            # 对参数矩阵中的每个元素,逐一计算数值梯度
            # 注意:对于大网络这会很慢,这里仅用于教学演示
            it = np.nditer(param, flags=['multi_index'])
            while not it.finished:
                idx = it.multi_index  # 当前元素的多维索引
                original_value = param[idx]  # 保存原始值

                # 计算 L(θ + ε)
                param[idx] = original_value + epsilon
                Y_pred_plus = model.forward(X)
                loss_plus = model.compute_loss(Y_pred_plus, Y)

                # 计算 L(θ - ε)
                param[idx] = original_value - epsilon
                Y_pred_minus = model.forward(X)
                loss_minus = model.compute_loss(Y_pred_minus, Y)

                # 双边差分近似梯度
                grad_numeric[idx] = (loss_plus - loss_minus) / (2.0 * epsilon)

                # 恢复原始值
                param[idx] = original_value
                it.iternext()

            # 计算相对误差
            numerator = np.linalg.norm(grad_analytic - grad_numeric)
            denominator = np.linalg.norm(grad_analytic) + np.linalg.norm(grad_numeric)
            rel_error = numerator / max(denominator, 1e-10)  # 避免除以零

            max_rel_error = max(max_rel_error, rel_error)
            print(f"  {param_name}: 相对误差 = {rel_error:.2e}")

    return max_rel_error


# ============================================================================
# 第四部分:数据生成
# ============================================================================

def make_moons_dataset(n_samples: int = 200, noise: float = 0.15, seed: int = 0) -> Tuple[np.ndarray, np.ndarray]:
    """
    生成双月形二分类数据集(类似 sklearn 的 make_moons)。

    参数:
        n_samples: 样本总数
        noise: 噪声标准差
        seed: 随机种子

    返回:
        X: 特征矩阵,shape (2, n_samples)
        Y: 标签矩阵,shape (1, n_samples)
    """
    np.random.seed(seed)
    n_samples_per_class = n_samples // 2

    # 上半月(类别 0)
    t = np.linspace(0, np.pi, n_samples_per_class)  # 角度从 0 到 π
    X0 = np.vstack([
        np.cos(t) + np.random.randn(n_samples_per_class) * noise,  # x 坐标 + 噪声
        np.sin(t) + np.random.randn(n_samples_per_class) * noise,  # y 坐标 + 噪声
    ])
    Y0 = np.zeros((1, n_samples_per_class))  # 标签 0

    # 下半月(类别 1)
    X1 = np.vstack([
        1 - np.cos(t) + np.random.randn(n_samples_per_class) * noise,  # 右移 + 噪声
        1 - np.sin(t) - 0.5 + np.random.randn(n_samples_per_class) * noise,  # 下移 + 噪声
    ])
    Y1 = np.ones((1, n_samples_per_class))  # 标签 1

    # 合并两个类别并打乱顺序
    X = np.hstack([X0, X1])
    Y = np.hstack([Y0, Y1])
    # 随机打乱
    idx = np.random.permutation(n_samples)
    X, Y = X[:, idx], Y[:, idx]

    return X, Y


# ============================================================================
# 第五部分:可视化
# ============================================================================

def plot_decision_boundary(model: MLP, X: np.ndarray, Y: np.ndarray, title: str, filename: str):
    """
    绘制分类决策边界。

    参数:
        model: 训练好的 MLP 模型
        X: 输入特征(用于确定绘图范围)
        Y: 标签(用于着色散点)
        title: 图表标题
        filename: 保存文件名
    """
    # 确定绘图范围:在数据范围基础上扩展一点边距
    x_min, x_max = X[0, :].min() - 0.5, X[0, :].max() + 0.5
    y_min, y_max = X[1, :].min() - 0.5, X[1, :].max() + 0.5

    # 生成网格点
    xx, yy = np.meshgrid(np.linspace(x_min, x_max, 200),
                         np.linspace(y_min, y_max, 200))
    grid = np.vstack([xx.ravel(), yy.ravel()])  # 将网格展开为 (2, N) 的矩阵
    Z = model.forward(grid)                       # 模型对每个网格点的预测
    Z = Z.reshape(xx.shape)                       # 恢复为网格形状

    plt.figure(figsize=(8, 6))
    # 绘制决策边界背景(蓝色=类别0区域,红色=类别1区域)
    plt.contourf(xx, yy, Z, levels=[0, 0.5, 1], alpha=0.3,
                 colors=['#4A90D9', '#E74C3C'])
    # 绘制决策边界线(p=0.5 的等高线)
    plt.contour(xx, yy, Z, levels=[0.5], colors='black', linewidths=2)

    # 绘制数据点
    plt.scatter(X[0, Y[0, :] == 0], X[1, Y[0, :] == 0],
                c='#4A90D9', edgecolors='white', s=50, label='Class 0')
    plt.scatter(X[0, Y[0, :] == 1], X[1, Y[0, :] == 1],
                c='#E74C3C', edgecolors='white', s=50, label='Class 1')

    plt.title(title, fontsize=14, fontweight='bold')
    plt.xlabel('Feature 1 (x₁)', fontsize=12)
    plt.ylabel('Feature 2 (x₂)', fontsize=12)
    plt.legend()
    plt.tight_layout()
    out = os.path.join(_IMAGES, filename)
    plt.savefig(out, dpi=150, bbox_inches='tight')
    plt.close()
    print(f"[可视化] {title} 已保存至 {out}")


def plot_training_curves(losses: List[float], grad_norms_history: List[Dict]):
    """
    绘制训练过程中的损失曲线和梯度范数变化。

    参数:
        losses: 每个 epoch 的损失值列表
        grad_norms_history: 每个 epoch 的梯度范数字典列表
    """
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))

    # ---- 左图:损失曲线 ----
    axes[0].plot(losses, 'b-', linewidth=2, alpha=0.8)
    axes[0].set_xlabel('Epoch', fontsize=12)
    axes[0].set_ylabel('Loss (MSE)', fontsize=12)
    axes[0].set_title('Training Loss Curve', fontsize=14, fontweight='bold')
    axes[0].grid(True, alpha=0.3)
    axes[0].set_yscale('log')  # 对数刻度更容易观察收敛趋势

    # ---- 右图:梯度范数变化 ----
    if grad_norms_history:
        # 提取每层权重的梯度范数
        L = model_for_plot.L  # 需要通过全局或其他方式传入
        for l in range(1, len(grad_norms_history[0]) // 2 + 1):
            key = f"|dW{l}|"
            if key in grad_norms_history[0]:
                values = [h[key] for h in grad_norms_history]
                axes[1].plot(values, linewidth=1.5, alpha=0.7,
                             label=f'Layer {l} |dW|')

    axes[1].set_xlabel('Epoch', fontsize=12)
    axes[1].set_ylabel('Gradient L2 Norm', fontsize=12)
    axes[1].set_title('Gradient Norm Monitoring', fontsize=14, fontweight='bold')
    axes[1].legend(fontsize=9)
    axes[1].grid(True, alpha=0.3)
    axes[1].set_yscale('log')

    plt.tight_layout()
    out = os.path.join(_IMAGES, 'training_curves.png')
    plt.savefig(out, dpi=150, bbox_inches='tight')
    plt.close()
    print(f"[可视化] 训练曲线已保存至 {out}")


def plot_weight_heatmaps(model_before: MLP, model_after: MLP):
    """
    绘制训练前后权重矩阵的热力图对比。

    参数:
        model_before: 训练前的模型
        model_after: 训练后的模型
    """
    L = model_before.L
    fig, axes = plt.subplots(L, 2, figsize=(10, 3 * L))

    if L == 1:
        axes = axes.reshape(1, -1)  # 确保索引一致性

    for l in range(L):
        # 训练前的权重
        im1 = axes[l, 0].imshow(model_before.parameters[f"W{l+1}"],
                                cmap='RdBu_r', aspect='auto')
        axes[l, 0].set_title(f'W[{l+1}] Before Training', fontsize=11)
        plt.colorbar(im1, ax=axes[l, 0])

        # 训练后的权重
        im2 = axes[l, 1].imshow(model_after.parameters[f"W{l+1}"],
                                cmap='RdBu_r', aspect='auto')
        axes[l, 1].set_title(f'W[{l+1}] After Training', fontsize=11)
        plt.colorbar(im2, ax=axes[l, 1])

    plt.suptitle('Weight Matrix Heatmaps: Before vs After Training', fontsize=14, fontweight='bold')
    plt.tight_layout()
    out = os.path.join(_IMAGES, 'weight_heatmaps.png')
    plt.savefig(out, dpi=150, bbox_inches='tight')
    plt.close()
    print(f"[可视化] 权重热力图已保存至 {out}")


# ============================================================================
# 第六部分:主程序
# ============================================================================

# 全局变量:用于在 plot_training_curves 中引用模型
model_for_plot = None


def main():
    global model_for_plot

    print("╔══════════════════════════════════════════════════════════════════╗")
    print("║   s07 多层网络的矩阵反传 — 完整 MLP 训练 (正向+反向+更新)      ║")
    print("╚══════════════════════════════════════════════════════════════════╝")

    # ---- 1. 生成数据集 ----
    print("\n[数据] 生成双月形二分类数据集...")
    X, Y = make_moons_dataset(n_samples=300, noise=0.15, seed=42)
    print(f"  X shape: {X.shape}, Y shape: {Y.shape}")
    print(f"  类别 0 样本数: {(Y == 0).sum()}, 类别 1 样本数: {(Y == 1).sum()}")

    # ---- 2. 初始化模型 ----
    # 2 输入 → 16 隐藏(ReLU) → 8 隐藏(ReLU) → 1 输出(Sigmoid)
    layer_dims = [2, 16, 8, 1]
    activations = ["relu", "relu", "sigmoid"]

    print(f"\n[模型] 网络结构: {layer_dims}")
    print(f"  激活函数: {activations}")
    print(f"  总层数: {len(layer_dims) - 1}")

    model = MLP(layer_dims, activations, seed=42)
    model_for_plot = model

    total_params = sum(p.size for p in model.parameters.values())
    print(f"  总参数量: {total_params}")

    # ---- 3. 梯度检查 ----
    print("\n[梯度检查] 使用有限差分验证解析梯度...")
    print("  (仅在小 batch 上检查少量参数,实际训练不会这样慢)")
    # 取少量样本用于快速检查
    X_check = X[:, :5]
    Y_check = Y[:, :5]
    max_error = gradient_check(model, X_check, Y_check, epsilon=1e-7)
    if max_error < 1e-5:
        print(f"  ✓ 梯度检查通过!最大相对误差: {max_error:.2e}")
    else:
        print(f"  ⚠ 最大相对误差: {max_error:.2e},建议检查反向传播实现")

    # ---- 4. 保存训练前的模型副本 ----
    # 深拷贝参数(用于训练前后对比)
    model_before_params = {}
    for key, val in model.parameters.items():
        model_before_params[key] = val.copy()

    # ---- 5. 训练循环 ----
    learning_rate = 0.5
    n_epochs = 2000
    print(f"\n[训练] 学习率={learning_rate}, Epochs={n_epochs}")

    losses = []
    grad_norms_history = []

    for epoch in range(n_epochs):
        # ---- 前向传播 ----
        Y_pred = model.forward(X)

        # ---- 计算损失 ----
        loss = model.compute_loss(Y_pred, Y)
        losses.append(loss)

        # ---- 反向传播 ----
        model.backward(Y)

        # ---- 记录梯度范数 ----
        grad_norms = model.get_gradient_norms()
        grad_norms_history.append(grad_norms)

        # ---- 参数更新 ----
        model.update(learning_rate)

        # ---- 打印训练进度 ----
        if epoch % 400 == 0 or epoch == n_epochs - 1:
            accuracy = np.mean((Y_pred > 0.5) == Y)  # 二分类准确率
            dw1_norm = grad_norms.get("|dW1|", 0)
            dw2_norm = grad_norms.get("|dW2|", 0)
            dw3_norm = grad_norms.get("|dW3|", 0)
            print(f"  Epoch {epoch:4d}: loss={loss:.6f}, accuracy={accuracy:.4f}, "
                  f"|dW1|={dw1_norm:.4f}, |dW2|={dw2_norm:.4f}, |dW3|={dw3_norm:.4f}")

    # ---- 6. 最终评估 ----
    Y_pred_final = model.forward(X)
    final_accuracy = np.mean((Y_pred_final > 0.5) == Y)
    print(f"\n[结果] 最终训练准确率: {final_accuracy:.4f} ({final_accuracy*100:.1f}%)")

    # ---- 7. 可视化 ----
    print("\n[可视化] 生成图表...")

    # 决策边界:训练前
    model_before = MLP(layer_dims, activations, seed=42)
    plot_decision_boundary(model_before, X, Y,
                           'Decision Boundary Before Training', 'decision_boundary_before.png')

    # 决策边界:训练后
    plot_decision_boundary(model, X, Y,
                           f'Decision Boundary After Training (Accuracy: {final_accuracy:.1%})',
                           'decision_boundary_after.png')

    # 训练曲线
    plot_training_curves(losses, grad_norms_history)

    # 权重热力图
    plot_weight_heatmaps(model_before, model)

    # ---- 8. 总结 ----
    print("\n" + "=" * 70)
    print("【总结】")
    print("=" * 70)
    print(f"  ✓ 实现了完整的前向传播、矩阵反向传播和参数更新")
    print(f"  ✓ 通过梯度检查验证了反向传播的正确性")
    print(f"  ✓ 在双月形数据集上达到了 {final_accuracy*100:.1f}% 的准确率")
    print(f"  ✓ 每层梯度范数的变化表明训练过程健康(无消失或爆炸)")
    print(f"\n  核心公式回顾:")
    print(f"    δ[L] = ∇_A L ⊙ φ'(Z[L])")
    print(f"    δ[l] = (W[l+1])^T @ δ[l+1] ⊙ φ'(Z[l])")
    print(f"    dW[l] = (1/m) · δ[l] @ (A[l-1])^T")
    print(f"    db[l] = (1/m) · Σ δ[l]")
    print("=" * 70)


if __name__ == "__main__":
    main()