Skip to content

s06 反向传播与链式法则 — demo.py 代码详解

Download demo.py

运行方式

bash
cd s06_backprop_chain_rule/code
python demo.py

代码逐段详解

第1步:导入库 — 每个库是做什么的

python
import os
import math
from typing import Set, List, Tuple
  • os:操作系统接口库。此处用于创建 images/ 输出目录,确保图片保存路径存在。
  • math:数学函数库。提供 math.exp()(指数函数)、math.tanh()(双曲正切)等底层数学运算,用于实现 Sigmoid、Tanh 等激活函数。
  • typing.Set, List, Tuple:类型注解。Set 用于记录计算图中已访问的节点,List 用于拓扑排序结果列表,Tuple 用于节点的前驱元组。

为什么不用 NumPy? 因为这个 demo 的目标是展示自动微分的底层原理——每个数值都是标量 Value,而非 NumPy 数组。下一节 s07 才扩展到矩阵/向量级别的反向传播,届时才需要 NumPy。


第2步:Value 类 — 自动微分的核心节点

整个 demo 的基石是 Value 类。每个 Value 对象就是计算图中的一个节点,它存储四样东西:

属性含义用途
data该节点的数值(标量)前向传播的结果值
grad累积的梯度 Lnode反向传播时累加
_backward局部反向传播函数(闭包)定义该操作对输入的梯度如何分配
_prev前驱节点集合用于拓扑排序,确定反向传播顺序
python
class Value:
    def __init__(self, data: float, _children: Tuple = (), _op: str = ""):
        self.data = data                # 存储数值
        self.grad = 0.0                 # 梯度初始化为 0
        self._backward = lambda: None   # 默认无操作(叶子节点)
        self._prev = set(_children)     # 前驱节点集合
        self._op = _op                  # 操作名称(如 "+", "*", "ReLU")

设计要点

  • grad 初始为 0,反传时通过 += 累加而非 = 赋值——这是因为一个变量可能被多条路径使用(fan-out),梯度需要求和。
  • _backward 是一个闭包(closure),它捕获了当前操作的局部上下文(如两个输入的 data 值),从而在反向时无需重新计算。
  • _childrenset 存储,去重一个节点被同一父节点多次引用的情况。

第3步:基本算术运算 — 局部梯度规则

每个运算 __add____mul__ 等都实现了两个关键部分:前向计算局部反向传播闭包

加法门 __add__

python
def __add__(self, other):
    other = other if isinstance(other, Value) else Value(other)
    out = Value(self.data + other.data, (self, other), '+')

    def _backward():
        self.grad += 1.0 * out.grad   # ∂(a+b)/∂a = 1
        other.grad += 1.0 * out.grad  # ∂(a+b)/∂b = 1

    out._backward = _backward
    return out

数学依据:加法门是梯度分发器

(a+b)a=1,(a+b)b=1

因此上游梯度 out.grad(即 Lout)原样传递给两个输入。链式法则:La=Loutouta=out.grad1

乘法门 __mul__

python
def __mul__(self, other):
    other = other if isinstance(other, Value) else Value(other)
    out = Value(self.data * other.data, (self, other), '*')

    def _backward():
        self.grad += other.data * out.grad   # ∂(a*b)/∂a = b
        other.grad += self.data * out.grad   # ∂(a*b)/∂b = a

    out._backward = _backward
    return out

数学依据:乘法门的梯度是交换的(gradient switcheroo)。

(ab)a=b,(ab)b=a

注意:闭包中捕获的是 other.dataself.data 的值(前向时的值),而非反向时的值。这是自动微分的标准做法——前向传播时存储中间值,反向传播时使用。

幂运算 __pow__

python
def __pow__(self, other):
    assert isinstance(other, (int, float)), "仅支持数值指数"
    out = Value(self.data ** other, (self,), f'**{other}')

    def _backward():
        self.grad += (other * self.data ** (other - 1)) * out.grad

    out._backward = _backward
    return out

数学公式:

(xn)x=nxn1

比如 x2 的导数是 2x。幂运算只有 self 一个输入参数(_children 中只有 self),所以反向传播时只有一个梯度接收者。

除法 __truediv__:巧妙的复用法

python
def __truediv__(self, other):
    return self * other ** -1

这里没有单独实现除法的反向传播,而是利用了已有的乘法和幂运算的组合a/b=a×b1。这是工程上非常聪明的做法——由于 __mul____pow__ 已经分别定义了正确的 _backward,除法门的梯度会通过链式法则自动正确。

数学验证:

(a/b)a=1b,(a/b)b=ab2

b ** -1 的反向传播与 a * (b**-1) 的反向传播组合时,链式法则会自动产出正确结果。


第4步:激活函数 — 非线性变换的反向传播

激活函数是神经网络非线性的来源。每个激活函数需要定义前向计算导数公式

ReLU:梯度门控

python
def relu(self):
    out = Value(max(0.0, self.data), (self,), 'ReLU')

    def _backward():
        self.grad += (out.data > 0) * out.grad

    out._backward = _backward
    return out

数学定义与导数:

ReLU(x)=max(0,x)ReLU(x)={1if x>00if x0

为什么此处用它? ReLU 是隐藏层最常用的激活函数。它的导数非0即1,没有饱和区(不像 Sigmoid/Tanh 在两端导数趋近0),因此能有效缓解梯度消失问题。

代码细节(out.data > 0) 在 Python 中产生布尔值 True/False,但在算术运算中 True=1、False=0。因此整个表达式相当于"如果前向值 > 0,梯度通过;否则截断为0"。这是一个优雅的梯度门控(gating)实现。

Sigmoid:数值稳定技巧

python
def sigmoid(self):
    x = self.data
    if x >= 0:
        s = 1.0 / (1.0 + math.exp(-x))      # 标准公式
    else:
        exp_x = math.exp(x)
        s = exp_x / (1.0 + exp_x)            # 数值稳定版本

    out = Value(s, (self,), 'Sigmoid')

    def _backward():
        self.grad += out.data * (1 - out.data) * out.grad

    out._backward = _backward
    return out

数学定义与导数:

σ(x)=11+exσ(x)=σ(x)(1σ(x))

Sigmoid 导数的优美性质是可以用前向输出直接计算导数,无需知道原始输入 x。这在工程上很方便——反向传播时直接用 out.data 即可。

数值稳定技巧:当 x 是非常大的负数时,ex 可能溢出(如 x=500e500 超出浮点数上限)。因此当 x<0 时,改用等效公式 σ(x)=ex1+ex,避免了 ex 的计算。

Tanh:零中心输出

python
def tanh(self):
    t = math.tanh(self.data)
    out = Value(t, (self,), 'Tanh')

    def _backward():
        self.grad += (1 - out.data ** 2) * out.grad

    out._backward = _backward
    return out

数学定义与导数:

tanh(x)=exexex+extanh(x)=1tanh2(x)

与 Sigmoid 类似,Tanh 的导数也可以用前向输出直接计算。Tanh 的输出范围是 (1,1),以零为中心(不像 Sigmoid 输出全正),因此在隐藏层有时优于 Sigmoid。

Exp:导数等于自身

python
def exp(self):
    out = Value(math.exp(self.data), (self,), 'exp')

    def _backward():
        self.grad += out.data * out.grad

    out._backward = _backward
    return out

数学公式:ddxex=ex。Exp 函数的导数就是它自己的值——这是指数函数最独特的性质,也是 softmax 中常用的基本操作。


第5步:backward() — 拓扑排序 + 逆序执行

这是整个自动微分引擎的核心。backward() 方法的职责是:从当前节点(通常是损失 L)出发,按正确的顺序调用计算图中每个节点的 _backward()

python
def backward(self):
    # ---- 步骤 1: 拓扑排序(DFS 实现) ----
    topo = []
    visited = set()

    def build_topo(v: Value):
        if v not in visited:
            visited.add(v)
            for child in v._prev:       # 递归访问所有前驱
                build_topo(child)
            topo.append(v)              # 后序遍历:子节点在前

    build_topo(self)

    # ---- 步骤 2: 初始化梯度 ----
    self.grad = 1.0   # ∂L/∂L = 1

    # ---- 步骤 3: 按拓扑逆序调用 backward ----
    for node in reversed(topo):
        node._backward()

为什么需要拓扑排序?

反向传播要求按计算图的逆序执行:先计算离输出近的节点的梯度,再计算离输入近的。拓扑排序确保了:当调用节点 v_backward() 时,它的所有后继(路径上更靠近输出的节点)的梯度已经计算完毕。

具体来说,build_topo 使用 DFS 后序遍历

  1. 从根节点(损失 L)出发,沿 _prev(前驱关系)反向遍历
  2. 后序遍历确保:子节点(离输入近的)先被 topo.append(),父节点后
  3. 最终 reversed(topo) 得到的顺序就是:从 L 出发,逐层向输入传播

初始梯度为什么是 1.0?

LL=1

损失对自身的导数恒为 1。这是整个反向传播中唯一"手动赋值"的梯度——其余所有梯度都通过链式法则和局部闭包自动计算。


第6步:演示1 — 基本表达式的反向传播

表达式:L=(a×b+c)×d

给定:a=2,b=3,c=4,d=5

python
a = Value(2.0)
b = Value(3.0)
c = Value(4.0)
d = Value(5.0)

e = a * b        # e = 6
f = e + c        # f = 10
L = f * d        # L = 50

前向传播按顺序构建计算图:a×be+cf×dL

调用 L.backward() 后,代码按以下顺序自动计算梯度:

  1. LL=1
  2. f(乘法门 L=f×d):Lf=d1=5Ld=f1=10
  3. e(加法门 f=e+c):Le=1Lf=5Lc=5
  4. a(乘法门 e=a×b):La=bLe=3×5=15Lb=a5=2×5=10

验证:

La=db=5×3=15Lb=da=5×2=10Lc=d1=5Ld=ab+c=10

第7步:演示2 — 激活函数的反向传播验证

这段代码选了 x=1.5 作为测试点,依次计算三种激活函数的前向值和梯度:

  • ReLUx=1.5>0,所以 ReLU(1.5)=1.5,梯度 =1.0
  • Sigmoidσ(1.5)0.8176,梯度 σ(1.5)(1σ(1.5))0.1491
  • Tanhtanh(1.5)0.9051,梯度 10.905120.1807

另外测试了负输入 x=1.5 时的 ReLU:ReLU(1.5)=0,梯度 =0(梯度被截断)。

关键代码模式:每次测试前调用 x.zero_grad() 清零梯度,否则前一次测试的梯度会残留并与本次累加,导致结果错误。这个模式与 PyTorch 的 optimizer.zero_grad() 完全对应。


第8步:演示3 — Fan-out 梯度累积

表达式:L=(2x)×(x+3),取 x=2

python
x = Value(2.0)
u = x * 2      # 路径1: x → u
v = x + 3      # 路径2: x → v
L = u * v      # 两条路径汇合

x 同时影响了 u(通过 ×2)和 v(通过 +3),两个影响最终在 L 处汇合。根据多元微积分中的全导数公式:

Lx=Luux+Lvvx

代入数值:

  • Lu=v=5ux=2,路径1贡献 =5×2=10
  • Lv=u=4vx=1,路径2贡献 =4×1=4

总梯度 =10+4=14

这是 grad += ... 而非 grad = ... 的直接原因——同一个变量出现在计算图的多条路径上时,每条路径的反向传播都会向该变量的 grad 中贡献一份,最终 grad 是所有路径贡献的


第9步:演示4 — 小神经网络完整训练

该演示构建了一个 2 输入 → 4 隐藏(ReLU) → 1 输出(线性) 的 MLP,在 4 个合成样本上训练 100 轮。

训练数据:目标函数 y=3x122x2+1,用 4 个 (x1,x2) 对生成标签。

python
model = MLP(2, [4, 1], ["relu", "linear"])

训练循环的三步曲

  1. 前向传播 (model(x)):输入经过 LayerNeuron 的计算链,最终得到预测值。
  2. 计算损失:MSE 损失 L=(ypredytrue)2(未除以 N,但不影响优化方向)。
  3. 反向传播 + 更新
    python
    for p in model.parameters():
        p.grad = 0.0           # 清零梯度(避免跨 batch 累加)
    total_loss.backward()       # 反向传播,计算所有参数的梯度
    for p in model.parameters():
        p.data -= learning_rate * p.grad   # 梯度下降:θ := θ - α·∇L

Neuron 类的权重初始化使用了 He 初始化:

WN(0,2nin)

其中 nin 是输入维度。He 初始化专门配合 ReLU 激活函数,能有效缓解训练初期的梯度消失。

Layer 类:将 nin 个输入连接到 nout 个 Neuron,前向传播时每个 Neuron 独立计算,输出组合成一个列表。

MLP 类:按顺序堆叠多个 Layer,最终取输出层第一个神经元的输出(对于回归任务)。


第10步:演示5 — 梯度下降求函数最小值

函数:f(x)=x2+3x

解析解:f(x)=2x+3=0x=1.5,最小值 f(1.5)=2.25

x=5 开始,学习率 α=0.1,迭代 20 步:

python
x = Value(5.0)
for step in range(20):
    loss = x * x + Value(3.0) * x   # 构造 f(x) = x² + 3x
    x.grad = 0.0                     # 清零梯度
    loss.backward()                  # 反向传播,x.grad = 2x + 3
    x.data -= 0.1 * x.grad          # 梯度下降更新

每次迭代中,loss.backward() 自动计算 fx=2x+3,然后用梯度下降更新 x

这个演示展示了自动微分在优化问题中的应用——不需要手动求导,只需用 Value 对象构造目标函数,backward() 自动给出梯度。


辅助组件:计算图可视化

print_computation_graph(L) 将计算图以文本表格形式打印:节点深度、操作类型、数据值、梯度、输入节点。这帮助直观地看到:

  • 每个节点的前驱是谁(计算依赖关系)
  • 前向值和梯度的大小
  • 梯度从输出端向输入端递减的规律

关键概念速查表

概念数学公式代码实现
链式法则Lw=Laazzwself.grad += local_deriv * out.grad
加法门(a+b)a=1self.grad += 1.0 * out.grad
乘法门(ab)a=bself.grad += other.data * out.grad(梯度交换)
幂运算xnx=nxn1self.grad += (other * self.data ** (other-1)) * out.grad
ReLUReLU(x)=1[x>0]self.grad += (out.data > 0) * out.grad
Sigmoidσ(x)=σ(x)(1σ(x))self.grad += out.data * (1 - out.data) * out.grad
Tanhtanh(x)=1tanh2(x)self.grad += (1 - out.data**2) * out.grad
拓扑排序(DFS)后序遍历计算图build_topo(v) 递归 + reversed(topo) 逆序
梯度累积(Fan-out)Lh=iLuiuihself.grad += ...(用 += 而非 =
梯度清零zero_grad() / p.grad = 0.0
梯度下降θt+1=θtαθLp.data -= lr * p.grad

完整代码

py
# -*- coding: utf-8 -*-
"""
s06 反向传播与链式法则 — 演示代码
==================================
功能:从零实现一个微型自动微分引擎(mini autograd),
      模拟 PyTorch 的 .backward() 核心逻辑。

每个 Value 节点记录:
  - data: 该节点的数值
  - grad: 该节点累积的梯度
  - _backward: 该节点的局部反向传播函数
  - _prev: 该节点的前驱节点(用于拓扑排序)

运行方式:在 s06_backprop_chain_rule/ 目录下执行 python code/demo.py
"""

import os
import math
from typing import Set, List, Tuple

# 图片保存目录:固定为本章节的 images/ 目录(相对于本脚本的 ../images/)
_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
_IMAGES_DIR = os.path.join(_SCRIPT_DIR, '..', 'images')
os.makedirs(_IMAGES_DIR, exist_ok=True)


# ============================================================================
# 第一部分:Value 类 —— 自动微分的核心
# ============================================================================

class Value:
    """
    自动微分引擎中的基本计算节点。

    每个 Value 对象代表计算图中的一个节点,它存储:
    - data: 数值(标量)
    - grad: 累积的梯度(默认初始化为 0)
    - _backward: 局部反向传播函数(闭包)
    - _prev: 该节点的直接前驱节点集合

    灵感来源:Andrej Karpathy 的 micrograd 项目
    """

    def __init__(self, data: float, _children: Tuple = (), _op: str = ""):
        """
        初始化一个 Value 节点。

        参数:
            data: 节点的数值
            _children: 前驱节点元组(用于构建计算图)
            _op: 产生此节点的操作名称(如 "+", "*", "ReLU" 等)
        """
        self.data = data                # 存储数值
        self.grad = 0.0                 # 梯度初始化为 0(反向传播时累加)
        self._backward = lambda: None   # 默认的反向传播函数(叶子节点无需操作)
        self._prev = set(_children)     # 前驱节点集合(用于拓扑排序)
        self._op = _op                  # 操作名称(用于可视化)

    # ---- 基本算术运算 ----

    def __add__(self, other):
        """
        加法操作: self + other

        局部梯度: ∂(self+other)/∂self = 1, ∂(self+other)/∂other = 1
        反向传播: 上游梯度原样传递给两个输入(加法门)
        """
        # 如果 other 不是 Value,先转换为 Value(支持 Value + 数值)
        other = other if isinstance(other, Value) else Value(other)
        # 创建结果节点
        out = Value(self.data + other.data, (self, other), '+')

        # 定义加法门的局部反向传播
        def _backward():
            # 加法门:梯度原样传递
            self.grad += 1.0 * out.grad   # ∂out/∂self = 1
            other.grad += 1.0 * out.grad  # ∂out/∂other = 1

        out._backward = _backward  # 绑定反向传播函数
        return out

    def __mul__(self, other):
        """
        乘法操作: self * other

        局部梯度: ∂(self*other)/∂self = other, ∂(self*other)/∂other = self
        反向传播: 梯度交换(乘以对方的 data 值)
        """
        other = other if isinstance(other, Value) else Value(other)
        out = Value(self.data * other.data, (self, other), '*')

        def _backward():
            # 乘法门:梯度交换(gradient switcheroo)
            self.grad += other.data * out.grad   # ∂out/∂self = other.data
            other.grad += self.data * out.grad   # ∂out/∂other = self.data

        out._backward = _backward
        return out

    def __pow__(self, other):
        """
        幂运算: self ** other (仅支持整数幂)

        局部梯度: ∂(x^n)/∂x = n * x^(n-1)
        """
        assert isinstance(other, (int, float)), "仅支持数值指数"
        out = Value(self.data ** other, (self,), f'**{other}')

        def _backward():
            # 幂函数的求导: d(x^n)/dx = n * x^(n-1)
            self.grad += (other * self.data ** (other - 1)) * out.grad

        out._backward = _backward
        return out

    def __neg__(self):
        """取负操作: -self 等价于 self * (-1)"""
        return self * -1

    def __sub__(self, other):
        """减法操作: self - other 等价于 self + (-other)"""
        return self + (-other)

    def __truediv__(self, other):
        """
        除法操作: self / other

        利用了: a / b = a * (b ** -1)
        局部梯度:
          ∂(a/b)/∂a = 1/b
          ∂(a/b)/∂b = -a / b^2
        """
        return self * other ** -1

    def __radd__(self, other):
        """右侧加法:other + self"""
        return self + other

    def __rmul__(self, other):
        """右侧乘法:other * self"""
        return self * other

    def __rsub__(self, other):
        """右侧减法:other - self"""
        return other + (-self)

    def __rtruediv__(self, other):
        """右侧除法:other / self"""
        return other * self ** -1

    # ---- 激活函数 ----

    def relu(self):
        """
        ReLU 激活函数: max(0, self)

        局部梯度: 1 if self.data > 0 else 0
        反向传播: 梯度门控——正区间通过,负区间截断为 0
        """
        out = Value(max(0.0, self.data), (self,), 'ReLU')

        def _backward():
            # ReLU 的局部导数:正区间为 1,负区间为 0
            self.grad += (out.data > 0) * out.grad

        out._backward = _backward
        return out

    def sigmoid(self):
        """
        Sigmoid 激活函数: 1 / (1 + e^{-x})

        局部梯度: σ(x) * (1 - σ(x))
        反向传播: 可以利用前向输出直接计算导数
        """
        # 数值稳定的 sigmoid 实现
        x = self.data
        # 对于非常大的负值,exp(-x) 会溢出,因此需要特殊情况处理
        if x >= 0:
            s = 1.0 / (1.0 + math.exp(-x))
        else:
            exp_x = math.exp(x)
            s = exp_x / (1.0 + exp_x)

        out = Value(s, (self,), 'Sigmoid')

        def _backward():
            # sigmoid 导数: σ(x)(1 - σ(x)) = out.data * (1 - out.data)
            self.grad += out.data * (1 - out.data) * out.grad

        out._backward = _backward
        return out

    def tanh(self):
        """
        Tanh 激活函数: (e^x - e^{-x}) / (e^x + e^{-x})

        局部梯度: 1 - tanh^2(x)
        反向传播: 利用前向输出直接计算导数
        """
        t = math.tanh(self.data)
        out = Value(t, (self,), 'Tanh')

        def _backward():
            # tanh 导数: 1 - tanh^2(x) = 1 - out.data^2
            self.grad += (1 - out.data ** 2) * out.grad

        out._backward = _backward
        return out

    def exp(self):
        """
        指数函数: e^{self}

        局部梯度: e^{self} = out.data
        反向传播: 梯度 = 节点自身的值
        """
        out = Value(math.exp(self.data), (self,), 'exp')

        def _backward():
            # exp 的导数就是它自己: d(e^x)/dx = e^x
            self.grad += out.data * out.grad

        out._backward = _backward
        return out

    # ---- 反向传播主函数 ----

    def backward(self):
        """
        执行反向传播:从当前节点(通常是损失)出发,
        按拓扑逆序遍历计算图,依次调用每个节点的 _backward()。

        算法步骤:
          1. 拓扑排序:找到从根节点到当前节点的所有路径上的节点
          2. 将当前节点的梯度设为 1.0(因为 ∂L/∂L = 1)
          3. 按拓扑逆序调用每个节点的 _backward()
        """
        # ---- 步骤 1: 拓扑排序(DFS 实现) ----
        topo = []      # 存储拓扑排序结果
        visited = set()  # 记录已访问节点

        def build_topo(v: Value):
            """深度优先搜索,构建拓扑排序"""
            if v not in visited:
                visited.add(v)              # 标记为已访问
                for child in v._prev:       # 递归访问所有前驱节点
                    build_topo(child)
                topo.append(v)              # 后序遍历:子节点在前,父节点在后

        build_topo(self)  # 从当前节点开始拓扑排序

        # ---- 步骤 2: 初始化梯度 ----
        self.grad = 1.0  # ∂L/∂L = 1(损失对自身的梯度恒为 1)

        # ---- 步骤 3: 按拓扑逆序调用 backward ----
        for node in reversed(topo):  # 逆序遍历拓扑排序(从输出到输入)
            node._backward()         # 调用该节点的局部反向传播

    # ---- 工具方法 ----

    def zero_grad(self):
        """将所有节点的梯度清零(每次反向传播前调用)"""
        # 遍历计算图中的所有节点
        visited = set()

        def _zero(v):
            if v not in visited:
                visited.add(v)
                v.grad = 0.0  # 梯度清零
                for child in v._prev:
                    _zero(child)

        _zero(self)

    def __repr__(self):
        """格式化输出:显示数值和梯度"""
        return f"Value(data={self.data:.4f}, grad={self.grad:.4f})"


# ============================================================================
# 第二部分:计算图可视化
# ============================================================================

def trace_graph(root: Value) -> Tuple[List[Value], Set[Tuple[Value, Value]]]:
    """
    从根节点出发,追踪整个计算图的所有节点和边。

    参数:
        root: 计算图的根节点(通常是输出/损失)

    返回:
        nodes: 所有节点的列表
        edges: 边的集合,每条边是 (parent, child) 元组
    """
    nodes = []       # 存储所有节点
    edges = set()    # 存储所有边
    visited = set()  # 记录已访问节点

    def build(v: Value):
        if v not in visited:
            visited.add(v)
            nodes.append(v)         # 记录节点
            for child in v._prev:   # 遍历所有子节点
                edges.add((child, v))  # 记录边:child → v
                build(child)

    build(root)
    return nodes, edges


def print_computation_graph(root: Value):
    """
    以文本形式打印计算图的结构和每个节点的梯度信息。

    参数:
        root: 计算图的根节点
    """
    nodes, edges = trace_graph(root)

    print("\n" + "=" * 70)
    print("【计算图结构】")
    print("=" * 70)

    # 按拓扑序打印(叶子节点在前)
    # 计算每个节点的"深度"(从叶子节点起的最长路径)
    depth = {}

    def compute_depth(v: Value) -> int:
        """递归计算节点的深度"""
        if v not in depth:
            if len(v._prev) == 0:
                depth[v] = 0  # 叶子节点深度为 0
            else:
                depth[v] = 1 + max(compute_depth(c) for c in v._prev)
        return depth[v]

    for node in nodes:
        compute_depth(node)

    # 按深度排序打印
    sorted_nodes = sorted(nodes, key=lambda v: depth.get(v, 0))

    print(f"\n{'深度':<6} {'操作':<10} {'数据值':<18} {'梯度':<18} {'输入'}")
    print("-" * 80)

    for node in sorted_nodes:
        d = depth.get(node, 0)
        op = node._op if node._op else "input"
        inputs_str = ", ".join([f"id={id(c) % 1000:03d}" for c in node._prev])
        if not node._prev:
            inputs_str = "(叶子节点)"
        print(f"{d:<6} {op:<10} {node.data:<18.6f} {node.grad:<18.8f} {inputs_str}")

    print("-" * 80)
    print(f"总节点数: {len(nodes)}, 总边数: {len(edges)}")
    print("=" * 70)


# ============================================================================
# 第三部分:神经网络模块
# ============================================================================

class Neuron:
    """
    单个神经元:w·x + b,后接激活函数

    参数:
        nin: 输入维度(特征数量)
        activation: 激活函数类型 ("relu", "sigmoid", "tanh", "linear")
    """

    def __init__(self, nin: int, activation: str = "relu"):
        import random
        # He 初始化:权重从 N(0, sqrt(2/nin)) 采样
        self.w = [Value(random.uniform(-1, 1) * math.sqrt(2.0 / nin)) for _ in range(nin)]
        self.b = Value(0.0)  # 偏置初始化为 0
        self.activation = activation

    def __call__(self, x: List[Value]) -> Value:
        """
        前向传播: activation(w·x + b)

        参数:
            x: 输入列表,长度必须等于 nin

        返回:
            神经元的输出 Value 节点
        """
        # 计算加权和 w·x + b
        # sum(w_i * x_i for each i) + b
        act = sum((wi * xi for wi, xi in zip(self.w, x)), self.b)

        # 应用激活函数
        if self.activation == "relu":
            out = act.relu()
        elif self.activation == "sigmoid":
            out = act.sigmoid()
        elif self.activation == "tanh":
            out = act.tanh()
        elif self.activation == "linear":
            out = act
        else:
            raise ValueError(f"不支持的激活函数: {self.activation}")

        return out

    def parameters(self) -> List[Value]:
        """返回该神经元的所有参数"""
        return self.w + [self.b]


class Layer:
    """
    神经网络中的一层:包含多个神经元

    参数:
        nin: 输入维度
        nout: 输出维度(该层的神经元数量)
        activation: 激活函数类型
    """

    def __init__(self, nin: int, nout: int, activation: str = "relu"):
        self.neurons = [Neuron(nin, activation) for _ in range(nout)]

    def __call__(self, x: List[Value]) -> List[Value]:
        """
        前向传播:对输入 x,每个神经元独立计算,输出列表

        参数:
            x: 输入列表

        返回:
            该层所有神经元的输出列表
        """
        return [n(x) for n in self.neurons]

    def parameters(self) -> List[Value]:
        """返回该层的所有参数"""
        params = []
        for neuron in self.neurons:
            params.extend(neuron.parameters())
        return params


class MLP:
    """
    多层感知机:按顺序堆叠多个 Layer

    参数:
        nin: 输入维度
        nouts: 每层输出维度的列表,如 [4, 4, 1] 表示两隐藏层+输出层
        activations: 每层激活函数的列表
    """

    def __init__(self, nin: int, nouts: List[int], activations: List[str]):
        # 构建层列表
        sizes = [nin] + nouts  # 如 [3, 4, 4, 1]
        self.layers = []
        for i in range(len(nouts)):
            self.layers.append(Layer(sizes[i], sizes[i + 1], activations[i]))

    def __call__(self, x: List[Value]) -> Value:
        """
        前向传播:数据依次经过每一层

        参数:
            x: 输入列表

        返回:
            最终输出节点(最后一层第一个神经元的输出,用于回归)
        """
        for layer in self.layers:
            x = layer(x)  # 将本层输出作为下层输入
        return x[0]  # 输出层取第一个神经元

    def parameters(self) -> List[Value]:
        """返回网络中所有可训练参数"""
        params = []
        for layer in self.layers:
            params.extend(layer.parameters())
        return params


# ============================================================================
# 第四部分:演示程序
# ============================================================================

def demo_basic_expression():
    """演示 1: 基本表达式的计算图和反向传播 (a*b + c) * d"""
    print("\n" + "╔" + "═" * 68 + "╗")
    print("║" + " " * 10 + "演示 1: 基本表达式 (a*b + c) * d 的反向传播" + " " * 16 + "║")
    print("╚" + "═" * 68 + "╝")

    # ---- 前向传播 ----
    a = Value(2.0)  # a = 2
    b = Value(3.0)  # b = 3
    c = Value(4.0)  # c = 4
    d = Value(5.0)  # d = 5

    # 构建表达式: f = (a*b + c) * d
    e = a * b        # e = 2*3 = 6
    f = e + c        # f = 6+4 = 10
    L = f * d        # L = 10*5 = 50

    print(f"\n表达式: L = (a*b + c) * d")
    print(f"具体值: L = ({a.data}*{b.data} + {c.data}) * {d.data} = {L.data}")

    # ---- 反向传播 ----
    L.backward()  # 自动计算所有节点的梯度

    print(f"\n反向传播后的梯度:")
    print(f"  ∂L/∂a = {a.grad:.1f}  (预期: d * b = {d.data:.0f} * {b.data:.0f} = {d.data * b.data:.0f})")
    print(f"  ∂L/∂b = {b.grad:.1f}  (预期: d * a = {d.data:.0f} * {a.data:.0f} = {d.data * a.data:.0f})")
    print(f"  ∂L/∂c = {c.grad:.1f}  (预期: d = {d.data:.0f})")
    print(f"  ∂L/∂d = {d.grad:.1f}  (预期: a*b + c = {a.data*b.data + c.data:.0f})")

    # ---- 打印计算图 ----
    print_computation_graph(L)

    # ---- 手动验证 ----
    print("\n【手动验证】")
    # ∂L/∂a = d * (∂(a*b)/∂a) = d * b = 5*3 = 15
    expected_da = d.data * b.data
    print(f"  ∂L/∂a: 计算值={a.grad:.1f}, 预期值={expected_da:.1f}, 匹配={abs(a.grad - expected_da) < 1e-6}")
    # ∂L/∂b = d * a = 5*2 = 10
    expected_db = d.data * a.data
    print(f"  ∂L/∂b: 计算值={b.grad:.1f}, 预期值={expected_db:.1f}, 匹配={abs(b.grad - expected_db) < 1e-6}")
    # ∂L/∂c = d * 1 = 5
    expected_dc = d.data
    print(f"  ∂L/∂c: 计算值={c.grad:.1f}, 预期值={expected_dc:.1f}, 匹配={abs(c.grad - expected_dc) < 1e-6}")
    # ∂L/∂d = a*b + c = 10
    expected_dd = a.data * b.data + c.data
    print(f"  ∂L/∂d: 计算值={d.grad:.1f}, 预期值={expected_dd:.1f}, 匹配={abs(d.grad - expected_dd) < 1e-6}")


def demo_activation_functions():
    """演示 2: 激活函数及其反向传播"""
    print("\n" + "╔" + "═" * 68 + "╗")
    print("║" + " " * 10 + "演示 2: 激活函数的反向传播 (ReLU, Sigmoid, Tanh)" + " " * 14 + "║")
    print("╚" + "═" * 68 + "╝")

    x = Value(1.5)

    # ---- ReLU ----
    print(f"\n--- ReLU ---")
    a_relu = x.relu()
    a_relu.backward()
    print(f"  x={x.data:.1f}, ReLU(x)={a_relu.data:.4f}, ∂ReLU/∂x={x.grad:.4f} (预期: 1.0, 因为 x>0)")

    # ---- Sigmoid ----
    x.zero_grad()  # 清空梯度,准备下一个测试
    print(f"\n--- Sigmoid ---")
    a_sig = x.sigmoid()
    a_sig.backward()
    expected_sig_grad = a_sig.data * (1 - a_sig.data)
    print(f"  x={x.data:.1f}, Sigmoid(x)={a_sig.data:.6f}, ∂Sigmoid/∂x={x.grad:.6f}")
    print(f"  预期导数: σ(x)(1-σ(x)) = {expected_sig_grad:.6f}")

    # ---- Tanh ----
    x.zero_grad()
    print(f"\n--- Tanh ---")
    a_tanh = x.tanh()
    a_tanh.backward()
    expected_tanh_grad = 1 - a_tanh.data ** 2
    print(f"  x={x.data:.1f}, Tanh(x)={a_tanh.data:.6f}, ∂Tanh/∂x={x.grad:.6f}")
    print(f"  预期导数: 1-tanh²(x) = {expected_tanh_grad:.6f}")

    # ---- ReLU 在负值的情况 ----
    x_neg = Value(-1.5)
    print(f"\n--- ReLU (负输入) ---")
    a_relu_neg = x_neg.relu()
    a_relu_neg.backward()
    print(f"  x={x_neg.data:.1f}, ReLU(x)={a_relu_neg.data:.1f}, ∂ReLU/∂x={x_neg.grad:.1f} (预期: 0.0, 因为 x<0)")


def demo_fanout():
    """演示 3: Fan-out —— 梯度累积"""
    print("\n" + "╔" + "═" * 68 + "╗")
    print("║" + " " * 10 + "演示 3: Fan-Out 梯度累积             " + " " * 27 + "║")
    print("╚" + "═" * 68 + "╝")

    # ---- 构建 fan-out 示例: L = (2x) * (x + 3) ----
    x = Value(2.0)
    # 路径 1: x → *2 → u
    u = x * 2    # u = 2x = 4
    # 路径 2: x → +3 → v
    v = x + 3    # v = x+3 = 5
    # 两条路径汇合: u * v = L
    L = u * v    # L = 4*5 = 20

    print(f"\n表达式: L = (2x) * (x + 3)")
    print(f"当 x=2: u=2x={u.data}, v=x+3={v.data}, L={L.data}")

    L.backward()

    # 验证梯度
    # ∂L/∂x = ∂L/∂u · ∂u/∂x + ∂L/∂v · ∂v/∂x
    #        = v · 2        + u · 1
    #        = 5 · 2        + 4 · 1
    #        = 10 + 4 = 14
    expected_grad = v.data * 2 + u.data * 1
    print(f"\n反向传播结果:")
    print(f"  ∂L/∂x = {x.grad:.1f}")
    print(f"  预期值 = v·2 + u·1 = {v.data:.0f}·2 + {u.data:.0f}·1 = {expected_grad:.0f}")
    print(f"  分解: 路径1贡献 = {v.data * 2:.0f} (∂L/∂u · ∂u/∂x = v · 2)")
    print(f"        路径2贡献 = {u.data:.0f} (∂L/∂v · ∂v/∂x = u · 1)")
    print(f"        总和 = {v.data * 2 + u.data:.0f}")

    print(f"\n  梯度匹配: {abs(x.grad - expected_grad) < 1e-6}")


def demo_mini_neural_network():
    """演示 4: 使用 mini autograd 训练一个小神经网络"""
    print("\n" + "╔" + "═" * 68 + "╗")
    print("║" + " " * 10 + "演示 4: 小神经网络完整训练 (正向+反向+更新)" + " " * 16 + "║")
    print("╚" + "═" * 68 + "╝")

    # ---- 创建一个简单的 MLP: 2输入 → 4隐藏 → 1输出 ----
    model = MLP(2, [4, 1], ["relu", "linear"])
    print(f"\n网络结构: 输入=2, 隐藏层=4(ReLU), 输出=1(线性)")
    print(f"总参数量: {len(model.parameters())}")

    # ---- 生成简单的训练数据 ----
    # 目标函数: y = 3*x1^2 - 2*x2 + 1
    xs = [
        [Value(1.0), Value(2.0)],
        [Value(2.0), Value(1.0)],
        [Value(0.0), Value(3.0)],
        [Value(3.0), Value(0.0)],
    ]
    ys = [3*1*1 - 2*2 + 1, 3*2*2 - 2*1 + 1, 3*0*0 - 2*3 + 1, 3*3*3 - 2*0 + 1]
    # 目标值: [0, 11, -5, 28]

    print(f"\n训练数据:")
    for i, (xi, yi) in enumerate(zip(xs, ys)):
        print(f"  样本{i+1}: x={[v.data for v in xi]}, y={yi}")

    # ---- 训练循环 ----
    learning_rate = 0.01
    n_epochs = 100

    print(f"\n开始训练 (lr={learning_rate}, epochs={n_epochs})...")

    for epoch in range(n_epochs):
        # ---- 前向传播 ----
        y_preds = [model(x) for x in xs]  # 对每个样本进行预测

        # ---- 计算损失 (MSE) ----
        # L = (1/N) * Σ (y_pred - y_true)²
        losses = [(yp - Value(y_true)) ** 2 for yp, y_true in zip(y_preds, ys)]
        total_loss = sum(losses[1:], losses[0])  # 对损失求和
        # 注意:这里没有除以 N,实际训练时除以 N 不影响优化方向

        # ---- 反向传播 ----
        # 先清零所有参数的梯度
        for p in model.parameters():
            p.grad = 0.0
        total_loss.backward()  # 反向传播,计算所有梯度

        # ---- 参数更新 (梯度下降) ----
        for p in model.parameters():
            p.data -= learning_rate * p.grad  # θ := θ - α·∇L

        # ---- 打印训练进度 ----
        if epoch % 20 == 0 or epoch == n_epochs - 1:
            print(f"  Epoch {epoch:3d}: loss = {total_loss.data:.4f}, "
                  f"预测 = {[f'{yp.data:.2f}' for yp in y_preds]}")

    print(f"\n训练完成!最终预测 vs 目标:")
    for i, (xi, yi) in enumerate(zip(xs, ys)):
        yp = model(xi)
        print(f"  样本{i+1}: 预测={yp.data:.2f}, 目标={yi}, 误差={abs(yp.data - yi):.2f}")


def demo_gradient_descent_1d():
    """演示 5: 用自动微分做梯度下降,求 f(x) = x² + 3x 的最小值"""
    print("\n" + "╔" + "═" * 68 + "╗")
    print("║" + " " * 8 + "演示 5: 自动微分 + 梯度下降求 f(x)=x²+3x 最小值" + " " * 14 + "║")
    print("╚" + "═" * 68 + "╝")

    x = Value(5.0)  # 初始值 x=5
    learning_rate = 0.1
    n_steps = 20

    print(f"\nf(x) = x² + 3x, 初始 x = {x.data:.1f}")
    print(f"解析解: f'(x)=2x+3=0 → x = -1.5")
    print(f"最小值: f(-1.5) = (-1.5)² + 3(-1.5) = 2.25 - 4.5 = -2.25")

    print(f"\n梯度下降过程 (lr={learning_rate}):")
    print(f"{'步骤':<6} {'x':<12} {'f(x)':<12} {'梯度':<12}")

    for step in range(n_steps):
        # 构造表达式
        loss = x * x + Value(3.0) * x  # f(x) = x² + 3x

        # 清零梯度
        x.grad = 0.0

        # 反向传播
        loss.backward()

        # 打印当前状态
        if step % 5 == 0 or step == n_steps - 1:
            print(f"{step:<6} {x.data:<12.4f} {loss.data:<12.6f} {x.grad:<12.4f}")

        # 参数更新
        x.data -= learning_rate * x.grad

    print(f"\n最终结果: x ≈ {x.data:.4f} (理论最优: -1.5)")
    print(f"最小函数值: f({x.data:.4f}) ≈ {x.data**2 + 3*x.data:.4f} (理论: -2.25)")


# ============================================================================
# 主程序入口
# ============================================================================

if __name__ == "__main__":
    print("╔══════════════════════════════════════════════════════════════════╗")
    print("║     s06 反向传播与链式法则 — 从零实现 mini autograd 引擎        ║")
    print("║     灵感来源: Andrej Karpathy 的 micrograd                       ║")
    print("╚══════════════════════════════════════════════════════════════════╝")

    # 依次运行所有演示
    demo_basic_expression()       # 基本表达式和链式法则
    demo_activation_functions()   # 激活函数的反向传播
    demo_fanout()                 # Fan-out 梯度累积
    demo_mini_neural_network()    # 完整神经网络训练
    demo_gradient_descent_1d()    # 自动微分做优化

    print("\n" + "=" * 70)
    print("【总结】")
    print("=" * 70)
    print("  ✓ 实现了 Value 类,支持自动微分的核心操作")
    print("  ✓ 构建了计算图,按拓扑逆序执行反向传播")
    print("  ✓ 验证了链式法则、局部梯度规则和梯度累积")
    print("  ✓ 用 mini autograd 完成了完整的神经网络训练流程")
    print("  ✓ 核心原理与 PyTorch/TensorFlow 的 autograd 一致")
    print("=" * 70)