Skip to content

s08 优化器:从SGD到Adam — demo.py 代码详解

Download demo.py

运行方式

bash
cd s08_optimizers_sgd_to_adam/code
python demo.py

代码逐段详解

第1步:导入库 — 每个库是做什么的

python
import numpy as np
import matplotlib.pyplot as plt
from typing import Tuple, List, Dict, Callable
import os
  • numpy:提供数组操作、随机数生成(np.random.randn 用于模拟梯度噪声)、数学运算(np.sqrtnp.linalg.norm)。
  • matplotlib:绘制损失地形等高线图、优化器轨迹、损失曲线、超参数游乐场子图。
  • typing:类型注解,标注函数签名中的参数和返回值类型。

第2步:损失地形 — 狭长峡谷形的二维二次型

python
class LossLandscape:
    def __init__(self, a: float = 20.0, b: float = 1.0):
        self.a = a  # 陡峭方向曲率
        self.b = b  # 平缓方向曲率

    def __call__(self, theta: np.ndarray) -> float:
        theta1, theta2 = theta[0], theta[1]
        return 0.5 * (self.a * theta1 ** 2 + self.b * theta2 ** 2)

    def gradient(self, theta: np.ndarray) -> np.ndarray:
        theta1, theta2 = theta[0], theta[1]
        return np.array([self.a * theta1, self.b * theta2])

损失函数定义:

L(θ1,θ2)=12(aθ12+bθ22)

梯度:

L(θ1,θ2)=[aθ1bθ2]

条件数(Condition Number) κ=a/b=20 决定了地形的"狭长度"。κ1 意味着:

  • θ1 方向(系数 a=20):陡峭,梯度 =20θ1,稍微偏离原点就产生很大梯度
  • θ2 方向(系数 b=1):平缓,梯度 =θ2,偏离较多才有较大梯度

全局最优解在原点 (0,0),最小损失 =0。这个简单的二次型能清晰展示不同优化器在"狭长峡谷"中的表现差异。


第3步:SGD — 最朴素的优化器

python
class SGDOptimizer:
    def __init__(self, lr: float = 0.02):
        self.lr = lr

    def step(self, theta: np.ndarray, grad: np.ndarray) -> np.ndarray:
        return theta - self.lr * grad

更新公式:

θt+1=θtαgt

特点:不记忆任何历史信息。每一步只看当前梯度,直接往反方向走。这是最纯粹的梯度下降,也是所有改进的基准线(baseline)。

SGD 存储开销:0 个额外向量——只需要存储参数本身。


第4步:Momentum — 给优化器加"惯性"

python
class MomentumOptimizer:
    def __init__(self, lr: float = 0.02, beta: float = 0.9):
        self.lr = lr
        self.beta = beta
        self.m = None  # 速度向量

    def step(self, theta: np.ndarray, grad: np.ndarray) -> np.ndarray:
        if self.m is None:
            self.m = np.zeros_like(theta)  # m_0 = 0

        self.m = self.beta * self.m + (1 - self.beta) * grad
        return theta - self.lr * self.m

更新公式:

mt=βmt1+(1β)gtθt+1=θtαmt

直觉mt 是梯度的指数滑动平均(EMA)β=0.9 意味着大约 90% 的权重来自历史梯度,10% 来自当前梯度。有效记忆长度约 11β10 步。

Momentum 存储开销:1 个额外向量(mt,与参数同形)。相比 SGD 多了一倍的存储,但训练更平稳。

为什么 (1-beta) 而非直接 beta?这是"凸组合"的标准写法:确保所有权重之和为 1。展开后 mt=(1β)(gt+βgt1+β2gt2+)


第5步:RMSProp — 给每个参数自适应步长

python
class RMSPropOptimizer:
    def __init__(self, lr: float = 0.05, beta: float = 0.9, eps: float = 1e-8):
        self.lr = lr
        self.beta = beta
        self.eps = eps
        self.v = None

    def step(self, theta: np.ndarray, grad: np.ndarray) -> np.ndarray:
        if self.v is None:
            self.v = np.zeros_like(theta)

        self.v = self.beta * self.v + (1 - self.beta) * (grad ** 2)
        return theta - self.lr * grad / (np.sqrt(self.v) + self.eps)

更新公式:

vt=βvt1+(1β)gtgtθt+1=θtαgtvt+ϵ

直觉vt 是梯度平方的 EMA,估算每个参数的梯度方差。对于梯度大的参数(陡峭方向),vt 大 → 分母 vt 大 → 有效步长自动变小。对于梯度小的参数(平缓方向),vt 小 → 分母小 → 有效步长相对更大。

ϵ=108 的作用:防止除以零。在训练的极早期,某些参数的 vt 可能仍接近 0,ϵ 提供了数值稳定性。

RMSProp 存储开销:1 个额外向量(vt)。


第6步:Adam — Momentum + RMSProp + 偏差修正

python
class AdamOptimizer:
    def __init__(self, lr: float = 0.1, beta1: float = 0.9,
                 beta2: float = 0.999, eps: float = 1e-8):
        self.lr = lr
        self.beta1 = beta1
        self.beta2 = beta2
        self.eps = eps
        self.m = None   # 一阶矩
        self.v = None   # 二阶矩
        self.t = 0      # 迭代步数计数器

    def step(self, theta: np.ndarray, grad: np.ndarray) -> np.ndarray:
        if self.m is None:
            self.m = np.zeros_like(theta)
            self.v = np.zeros_like(theta)

        self.t += 1

        # 一阶矩(Momentum 部分)
        self.m = self.beta1 * self.m + (1 - self.beta1) * grad
        # 二阶矩(RMSProp 部分)
        self.v = self.beta2 * self.v + (1 - self.beta2) * (grad ** 2)

        # 偏差修正
        m_hat = self.m / (1 - self.beta1 ** self.t)
        v_hat = self.v / (1 - self.beta2 ** self.t)

        # 参数更新
        return theta - self.lr * m_hat / (np.sqrt(v_hat) + self.eps)

完整数学公式

一阶矩(方向):

mt=β1mt1+(1β1)gt

二阶矩(尺度):

vt=β2vt1+(1β2)gtgt

偏差修正:

m^t=mt1β1t,v^t=vt1β2t

参数更新(Adam 核心公式):

θt+1=θtαm^tv^t+ϵ

偏差修正为什么必要? mtvt 都从 0 初始化。在训练最初几步,它们的值系统性地偏小(如第一步 m1=0.1g1,只有真实梯度的 10%)。除以 1β1t 补偿了这个初始化偏差:第 1 步时 10.91=0.1,修正后 m^1=m1/0.1=g1,完美补偿。

Adam 存储开销:2 个额外向量(mtvt),是四种优化器中存储需求最大的。对于大模型(如有 10 亿参数),这意味着额外需要约 8GB 显存(每个参数存储 2 个 float32 状态)。


第7步:运行优化器 — 记录完整轨迹

python
def run_optimizer(optimizer, landscape, theta_init, n_steps=100,
                  add_noise=False, noise_std=0.0):
    theta = theta_init.copy()
    trajectory = [theta.copy()]
    losses = [landscape(theta)]

    for _ in range(n_steps):
        grad = landscape.gradient(theta)

        if add_noise:
            grad = grad + np.random.randn(*grad.shape) * noise_std

        theta = optimizer.step(theta, grad)
        trajectory.append(theta.copy())
        losses.append(landscape(theta))

    return np.array(trajectory), losses

add_noise 参数用于模拟 mini-batch 梯度噪声——真实训练中,我们只能用一个 mini-batch 估计梯度,估计值总是带有噪声。这个演示让你直观地看到:Adam 对噪声的鲁棒性远优于 SGD。


第8步:可视化 — 三种对比视角

视角1:等高线图上的轨迹对比

python
def plot_contour_comparison(landscape, all_trajectories, filename):
    # 生成等高线
    Z = 0.5 * (landscape.a * X**2 + landscape.b * Y**2)
    levels = np.logspace(-2, 2, 15)  # 对数间隔等高线
    ax.contour(X, Y, Z, levels=levels, cmap='Blues')
    ax.contourf(X, Y, Z, levels=levels, cmap='Blues', alpha=0.15)

    # 绘制每条优化器轨迹
    for name, traj in all_trajectories.items():
        ax.plot(traj[:, 0], traj[:, 1], '-', color=color, label=name)
        ax.plot(traj[0, 0], traj[0, 1], 'o')   # 起点
        ax.plot(traj[-1, 0], traj[-1, 1], 's')  # 终点

np.logspace(-2, 2, 15) 生成对数间隔的 15 个高度值——近距离处(接近原点)等高线密集,远处稀疏,配合狭长峡谷的损失地形。起点用小圆点标记,终点用方块标记,清晰展示每种优化器从 (3.0,3.0) 到达原点附近的路径。

视角2:损失下降曲线

对数纵轴的折线图,直观对比收敛速度。Adam 的曲线通常下降最快,SGD 在陡峭方向上震荡导致损失下降缓慢。

视角3:超参数游乐场

4 张子图分别对应 lr = 0.01, 0.05, 0.1, 0.5,每张绘制四种优化器的轨迹。展示学习率对优化器的影响

  • 小 lr = 0.01:所有优化器都收敛缓慢
  • 大 lr = 0.5:SGD 剧烈震荡甚至发散,Adam 仍能稳定收敛

这直观展示了 Adam 对学习率的鲁棒性——在较宽的 lr 范围内都能稳定工作。


第9步:噪声鲁棒性对比

python
noisy_sgd = SGDOptimizer(lr=0.02)
noisy_adam = AdamOptimizer(lr=0.1)

traj_sgd_noisy, loss_sgd_noisy = run_optimizer(
    noisy_sgd, landscape, theta_init, n_steps=100,
    add_noise=True, noise_std=1.0  # 标准差为 1.0 的高斯噪声
)

梯度中加入 noise_std=1.0 的噪声后,SGD 的路径剧烈抖动(每一步的梯度方向都受噪声影响),而 Adam 由于 mt 的平滑作用,路径相对平稳——这是 Adam 在实际训练中表现良好的关键原因之一。


关键概念速查表

优化器核心记忆更新公式存储解决的痛点
SGDθαgt0—(基线)
Momentummt(一阶矩)θαmt1x方向抖动
RMSPropvt(二阶矩)θαgt/vt1x步长不统一
Adammt + vtθαm^t/v^t2x方向 + 步长 + 初始化偏差
条件数κ=a/b衡量损失地形"狭长度"
指数滑动平均mt=βmt1+(1β)gtAdam 的基础运算

完整代码

py
# -*- coding: utf-8 -*-
"""
s08 优化器:从 SGD 到 Adam — 演示代码
======================================
功能:在二维损失地形上可视化对比 SGD、Momentum、RMSProp、Adam 的优化轨迹。
      包括损失曲线、超参数游乐场(可调学习率和 β 值)。

运行方式:在 s08_optimizers_sgd_to_adam/ 目录下执行 python code/demo.py
"""

import numpy as np
import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams['axes.unicode_minus'] = False
from matplotlib.patches import FancyBboxPatch
from typing import Tuple, List, Dict, Callable
import os

_HERE = os.path.dirname(os.path.abspath(__file__))
_IMAGES = os.path.join(_HERE, '..', 'images')
os.makedirs(_IMAGES, exist_ok=True)

# ============================================================================
# 第一部分:定义损失函数(狭长峡谷形 2D 二次型)
# ============================================================================

class LossLandscape:
    """
    二维二次型损失函数:L(θ₁, θ₂) = 0.5 * (a·θ₁² + b·θ₂²)

    当 a >> b 时,形成狭长峡谷地形:
      - θ₁ 方向(大系数 a):陡峭方向,梯度大
      - θ₂ 方向(小系数 b):平缓方向,梯度小

    参数:
        a: θ₁ 方向的曲率(默认 20.0,陡峭)
        b: θ₂ 方向的曲率(默认 1.0,平缓)
    """

    def __init__(self, a: float = 20.0, b: float = 1.0):
        self.a = a  # 陡峭方向曲率
        self.b = b  # 平缓方向曲率

    def __call__(self, theta: np.ndarray) -> float:
        """
        计算损失值。

        参数:
            theta: 参数向量,shape (2,)

        返回:
            损失值(标量)
        """
        theta1, theta2 = theta[0], theta[1]
        return 0.5 * (self.a * theta1 ** 2 + self.b * theta2 ** 2)

    def gradient(self, theta: np.ndarray) -> np.ndarray:
        """
        计算梯度 ∇L(θ)。

        参数:
            theta: 参数向量,shape (2,)

        返回:
            梯度向量,shape (2,): [a·θ₁, b·θ₂]
        """
        theta1, theta2 = theta[0], theta[1]
        return np.array([self.a * theta1, self.b * theta2])

    @property
    def optimum(self) -> np.ndarray:
        """返回全局最优点 θ* = (0, 0),最小损失值为 0"""
        return np.array([0.0, 0.0])


# ============================================================================
# 第二部分:优化器实现
# ============================================================================

class SGDOptimizer:
    """
    朴素 SGD 优化器。

    更新公式: θ_{t+1} = θ_t - α · g_t

    参数:
        lr: 学习率 α
    """

    def __init__(self, lr: float = 0.02):
        self.lr = lr
        self.name = "SGD"

    def step(self, theta: np.ndarray, grad: np.ndarray) -> np.ndarray:
        """
        执行一步 SGD 更新。

        参数:
            theta: 当前参数
            grad: 当前梯度

        返回:
            更新后的参数
        """
        return theta - self.lr * grad  # θ := θ - α·∇L


class MomentumOptimizer:
    """
    Momentum 优化器(带动量的 SGD)。

    公式:
        m_t = β · m_{t-1} + (1-β) · g_t
        θ_{t+1} = θ_t - α · m_t

    参数:
        lr: 学习率 α
        beta: 动量衰减系数 β(默认 0.9)
    """

    def __init__(self, lr: float = 0.02, beta: float = 0.9):
        self.lr = lr
        self.beta = beta
        self.m = None  # 速度向量,初始化为 None,第一次 step 时初始化
        self.name = "Momentum"

    def step(self, theta: np.ndarray, grad: np.ndarray) -> np.ndarray:
        """
        执行一步 Momentum 更新。

        参数:
            theta: 当前参数
            grad: 当前梯度

        返回:
            更新后的参数
        """
        # 首次调用时,初始化动量向量为零
        if self.m is None:
            self.m = np.zeros_like(theta)  # m_0 = 0

        # m_t = β · m_{t-1} + (1-β) · g_t
        self.m = self.beta * self.m + (1 - self.beta) * grad
        # θ_{t+1} = θ_t - α · m_t
        return theta - self.lr * self.m


class RMSPropOptimizer:
    """
    RMSProp 优化器。

    公式:
        v_t = β · v_{t-1} + (1-β) · g_t²
        θ_{t+1} = θ_t - α · g_t / (√v_t + ε)

    参数:
        lr: 学习率 α
        beta: 衰减系数 β(默认 0.9)——注意 RMSProp 通常也用 0.9
        eps: 数值稳定常数 ε
    """

    def __init__(self, lr: float = 0.02, beta: float = 0.9, eps: float = 1e-8):
        self.lr = lr
        self.beta = beta
        self.eps = eps
        self.v = None  # 梯度平方的滑动平均,第一次 step 时初始化
        self.name = "RMSProp"

    def step(self, theta: np.ndarray, grad: np.ndarray) -> np.ndarray:
        """
        执行一步 RMSProp 更新。

        参数:
            theta: 当前参数
            grad: 当前梯度

        返回:
            更新后的参数
        """
        if self.v is None:
            self.v = np.zeros_like(theta)  # v_0 = 0

        # v_t = β · v_{t-1} + (1-β) · g_t²
        self.v = self.beta * self.v + (1 - self.beta) * (grad ** 2)
        # θ_{t+1} = θ_t - α · g_t / (√v_t + ε)
        return theta - self.lr * grad / (np.sqrt(self.v) + self.eps)


class AdamOptimizer:
    """
    Adam 优化器(带偏差修正)。

    公式:
        m_t = β₁ · m_{t-1} + (1-β₁) · g_t          (一阶矩/方向)
        v_t = β₂ · v_{t-1} + (1-β₂) · g_t²         (二阶矩/尺度)
        m̂_t = m_t / (1 - β₁^t)                      (一阶矩偏差修正)
        v̂_t = v_t / (1 - β₂^t)                      (二阶矩偏差修正)
        θ_{t+1} = θ_t - α · m̂_t / (√v̂_t + ε)       (参数更新)

    参数:
        lr: 学习率 α(默认 0.1)
        beta1: 一阶矩衰减率(默认 0.9)
        beta2: 二阶矩衰减率(默认 0.999)
        eps: 数值稳定常数(默认 1e-8)
    """

    def __init__(self, lr: float = 0.1, beta1: float = 0.9,
                 beta2: float = 0.999, eps: float = 1e-8):
        self.lr = lr
        self.beta1 = beta1
        self.beta2 = beta2
        self.eps = eps
        self.m = None    # 一阶矩向量
        self.v = None    # 二阶矩向量
        self.t = 0       # 迭代步数计数器
        self.name = "Adam"

    def step(self, theta: np.ndarray, grad: np.ndarray) -> np.ndarray:
        """
        执行一步 Adam 更新(含偏差修正)。

        参数:
            theta: 当前参数
            grad: 当前梯度

        返回:
            更新后的参数
        """
        if self.m is None:
            self.m = np.zeros_like(theta)  # m_0 = 0
            self.v = np.zeros_like(theta)  # v_0 = 0

        self.t += 1  # 迭代步数 +1

        # ---- step 1: 更新一阶矩(动量) ----
        self.m = self.beta1 * self.m + (1 - self.beta1) * grad

        # ---- step 2: 更新二阶矩(梯度平方的均值) ----
        self.v = self.beta2 * self.v + (1 - self.beta2) * (grad ** 2)

        # ---- step 3: 偏差修正 ----
        m_hat = self.m / (1 - self.beta1 ** self.t)  # 修正一阶矩
        v_hat = self.v / (1 - self.beta2 ** self.t)  # 修正二阶矩

        # ---- step 4: 参数更新 ----
        return theta - self.lr * m_hat / (np.sqrt(v_hat) + self.eps)


# ============================================================================
# 第三部分:训练与轨迹记录
# ============================================================================

def run_optimizer(
    optimizer,
    landscape: LossLandscape,
    theta_init: np.ndarray,
    n_steps: int = 100,
    add_noise: bool = False,
    noise_std: float = 0.0
) -> Tuple[np.ndarray, List[float]]:
    """
    在给定的损失地形上运行一个优化器,记录完整轨迹。

    参数:
        optimizer: 优化器对象(SGD / Momentum / RMSProp / Adam)
        landscape: 损失函数对象
        theta_init: 初始参数位置
        n_steps: 迭代步数
        add_noise: 是否在梯度中添加噪声(模拟 mini-batch 噪声)
        noise_std: 噪声标准差

    返回:
        trajectory: 每一步的参数位置,shape (n_steps+1, 2)
        losses: 每一步的损失值列表
    """
    theta = theta_init.copy()  # 当前参数
    trajectory = [theta.copy()]  # 记录初始位置
    losses = [landscape(theta)]  # 记录初始损失

    for _ in range(n_steps):
        grad = landscape.gradient(theta)  # 计算当前梯度

        # 添加噪声(模拟 mini-batch SGD 的梯度噪声)
        if add_noise:
            grad = grad + np.random.randn(*grad.shape) * noise_std

        theta = optimizer.step(theta, grad)  # 优化器更新一步
        trajectory.append(theta.copy())  # 记录位置
        losses.append(landscape(theta))  # 记录损失

    return np.array(trajectory), losses


# ============================================================================
# 第四部分:可视化
# ============================================================================

def plot_contour_comparison(
    landscape: LossLandscape,
    all_trajectories: Dict[str, np.ndarray],
    filename: str = "optimizer_trajectories.png"
):
    """
    在同一张等高线图上绘制所有优化器的轨迹,对比收敛行为。

    参数:
        landscape: 损失函数
        all_trajectories: {优化器名称: 轨迹数组} 的字典
        filename: 保存的文件名
    """
    # 创建等高线网格
    x_range = np.linspace(-4, 4, 200)
    y_range = np.linspace(-4, 4, 200)
    X, Y = np.meshgrid(x_range, y_range)
    Z = 0.5 * (landscape.a * X ** 2 + landscape.b * Y ** 2)

    fig, ax = plt.subplots(1, 1, figsize=(10, 8))

    # 绘制损失等高线(对数刻度以更好地显示峡谷结构)
    levels = np.logspace(-2, 2, 15)  # 对数间隔的等高线
    contour = ax.contour(X, Y, Z, levels=levels, cmap='Blues', alpha=0.6, linewidths=0.8)
    ax.clabel(contour, inline=True, fontsize=8, fmt='%.1f')

    # 绘制填充等高线背景
    ax.contourf(X, Y, Z, levels=levels, cmap='Blues', alpha=0.15)

    # 标记最优点
    optimum = landscape.optimum
    ax.plot(optimum[0], optimum[1], 'r*', markersize=15, label='θ* (Optimum)', zorder=10)

    # 绘制每条优化器轨迹
    colors = {'SGD': '#E74C3C', 'Momentum': '#2ECC71',
              'RMSProp': '#3498DB', 'Adam': '#9B59B6'}
    markers = {'SGD': 'o', 'Momentum': 's', 'RMSProp': '^', 'Adam': 'D'}

    for name, traj in all_trajectories.items():
        color = colors.get(name, 'gray')
        marker = markers.get(name, 'o')
        # 绘制轨迹线
        ax.plot(traj[:, 0], traj[:, 1], '-', color=color, linewidth=2,
                alpha=0.8, label=f'{name}')
        # 标注起点
        ax.plot(traj[0, 0], traj[0, 1], marker=marker, color=color,
                markersize=10, markeredgecolor='white', markeredgewidth=1.5)
        # 标注终点
        ax.plot(traj[-1, 0], traj[-1, 1], marker=marker, color=color,
                markersize=12, markeredgecolor='black', markeredgewidth=1.5)

    ax.set_xlabel('θ₁ (Flat Direction)', fontsize=13)
    ax.set_ylabel('θ₂ (Steep Direction)', fontsize=13)
    ax.set_title(f'Optimizer Trajectory Comparison\nL(theta) = 0.5*({landscape.a}*theta1^2 + {landscape.b}*theta2^2)',
                 fontsize=14, fontweight='bold')
    ax.legend(loc='upper right', fontsize=10, framealpha=0.9)
    ax.set_xlim(-4, 4)
    ax.set_ylim(-4, 4)
    ax.set_aspect('equal')
    ax.grid(True, alpha=0.2)

    # 添加文字注释
    ax.text(-3.5, 3.5, f'Condition Number κ = {landscape.a/landscape.b:.0f}',
            fontsize=11, bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8))

    plt.tight_layout()
    out = os.path.join(_IMAGES, filename)
    plt.savefig(out, dpi=150, bbox_inches='tight')
    plt.close()
    print(f"[可视化] 优化器轨迹对比图已保存至 {out}")


def plot_loss_curves(
    all_losses: Dict[str, List[float]],
    filename: str = "loss_curves.png"
):
    """
    绘制各优化器的损失-迭代步数曲线对比。

    参数:
        all_losses: {优化器名称: 损失列表} 的字典
        filename: 保存的文件名
    """
    fig, ax = plt.subplots(1, 1, figsize=(10, 6))

    colors = {'SGD': '#E74C3C', 'Momentum': '#2ECC71',
              'RMSProp': '#3498DB', 'Adam': '#9B59B6'}

    for name, losses in all_losses.items():
        color = colors.get(name, 'gray')
        ax.plot(losses, '-', color=color, linewidth=2, alpha=0.8, label=name)

    ax.set_xlabel('Iteration', fontsize=13)
    ax.set_ylabel('Loss L(θ)', fontsize=13)
    ax.set_title('Loss Curve Comparison', fontsize=14, fontweight='bold')
    ax.set_yscale('log')  # 对数刻度
    ax.legend(fontsize=11)
    ax.grid(True, alpha=0.3)

    # 标注最终损失值
    y_max = ax.get_ylim()[1]
    for i, (name, losses) in enumerate(all_losses.items()):
        final_loss = losses[-1]
        ax.annotate(f'{name}: {final_loss:.2e}',
                    xy=(len(losses) - 1, final_loss),
                    xytext=(len(losses) - 1 - 30, y_max * (0.5 ** i * 0.8)),
                    fontsize=9, color=colors.get(name, 'gray'),
                    arrowprops=dict(arrowstyle='->', color=colors.get(name, 'gray')))

    plt.tight_layout()
    out = os.path.join(_IMAGES, filename)
    plt.savefig(out, dpi=150, bbox_inches='tight')
    plt.close()
    print(f"[可视化] 损失曲线对比图已保存至 {out}")


def plot_hyperparameter_playground(
    landscape: LossLandscape,
    theta_init: np.ndarray,
    filename: str = "hyperparameter_playground.png"
):
    """
    超参数游乐场:展示不同学习率下各优化器的表现。

    对比不同学习率 (0.01, 0.05, 0.1, 0.5) 对四种优化器的影响。

    参数:
        landscape: 损失函数
        theta_init: 初始参数
        filename: 保存的文件名
    """
    learning_rates = [0.01, 0.05, 0.1, 0.5]
    fig, axes = plt.subplots(2, 2, figsize=(14, 12))
    axes = axes.flatten()

    colors = {'SGD': '#E74C3C', 'Momentum': '#2ECC71',
              'RMSProp': '#3498DB', 'Adam': '#9B59B6'}

    for idx, lr in enumerate(learning_rates):
        ax = axes[idx]

        # 创建等高线背景
        x_range = np.linspace(-4, 4, 150)
        y_range = np.linspace(-4, 4, 150)
        X, Y = np.meshgrid(x_range, y_range)
        Z = 0.5 * (landscape.a * X ** 2 + landscape.b * Y ** 2)
        levels = np.logspace(-2, 2, 12)
        ax.contour(X, Y, Z, levels=levels, cmap='Blues', alpha=0.4, linewidths=0.5)
        ax.contourf(X, Y, Z, levels=levels, cmap='Blues', alpha=0.08)

        # 标记最优点
        ax.plot(0, 0, 'r*', markersize=12, zorder=10)

        # 对每个优化器使用当前学习率
        n_steps = 80
        optimizers = [
            SGDOptimizer(lr=lr),
            MomentumOptimizer(lr=lr),
            RMSPropOptimizer(lr=lr),
            AdamOptimizer(lr=lr),
        ]

        for opt in optimizers:
            name = opt.name
            # 重置优化器状态并运行
            opt.m = None
            opt.v = None
            if hasattr(opt, 't'):
                opt.t = 0

            traj, _ = run_optimizer(opt, landscape, theta_init, n_steps)
            color = colors.get(name, 'gray')
            ax.plot(traj[:, 0], traj[:, 1], '-', color=color, linewidth=1.8,
                    alpha=0.8, label=name)
            ax.plot(traj[0, 0], traj[0, 1], 'o', color=color, markersize=6)
            ax.plot(traj[-1, 0], traj[-1, 1], 's', color=color, markersize=8,
                    markeredgecolor='black', markeredgewidth=1)

        ax.set_title(f'Learning Rate α = {lr}', fontsize=13, fontweight='bold')
        ax.set_xlim(-4, 4)
        ax.set_ylim(-4, 4)
        ax.set_aspect('equal')
        ax.grid(True, alpha=0.2)
        if idx == 0:
            ax.legend(loc='upper right', fontsize=8)

    plt.suptitle('Hyperparameter Playground: Effect of Learning Rate on Optimizers',
                 fontsize=16, fontweight='bold', y=1.01)
    plt.tight_layout()
    out = os.path.join(_IMAGES, filename)
    plt.savefig(out, dpi=150, bbox_inches='tight')
    plt.close()
    print(f"[可视化] 超参数游乐场已保存至 {out}")


# ============================================================================
# 第五部分:主程序
# ============================================================================

def main():
    print("╔══════════════════════════════════════════════════════════════════╗")
    print("║   s08 优化器:从 SGD 到 Adam — 损失地形上的轨迹对比             ║")
    print("╚══════════════════════════════════════════════════════════════════╝")

    # ---- 1. 创建损失地形 ----
    # 条件数 κ = a/b = 20,形成狭长峡谷
    landscape = LossLandscape(a=20.0, b=1.0)
    theta_init = np.array([3.0, 2.5])  # 初始位置在右上角
    n_steps = 150  # 迭代步数

    print(f"\n[损失地形] L(θ) = 0.5·({landscape.a}·θ₁² + {landscape.b}·θ₂²)")
    print(f"  条件数 κ = {landscape.a/landscape.b:.0f} (狭长峡谷)")
    print(f"  初始位置 θ₀ = {theta_init}")
    print(f"  迭代步数: {n_steps}")

    # ---- 2. 创建优化器 ----
    optimizers = [
        SGDOptimizer(lr=0.02),
        MomentumOptimizer(lr=0.02, beta=0.9),
        RMSPropOptimizer(lr=0.05, beta=0.9),
        AdamOptimizer(lr=0.1, beta1=0.9, beta2=0.999),
    ]

    # ---- 3. 运行优化并记录轨迹 ----
    print("\n[训练] 运行各优化器...")
    all_trajectories = {}
    all_losses = {}

    for opt in optimizers:
        traj, losses = run_optimizer(opt, landscape, theta_init, n_steps)
        all_trajectories[opt.name] = traj
        all_losses[opt.name] = losses

        final_loss = losses[-1]
        final_dist = np.linalg.norm(traj[-1] - landscape.optimum)
        print(f"  {opt.name:<10}: 最终损失={final_loss:.2e}, "
              f"距最优解={final_dist:.4f}")

    # ---- 4. 打印对比总结表 ----
    print("\n" + "=" * 70)
    print("【优化器对比总结】")
    print("=" * 70)
    print(f"{'优化器':<12} {'最终损失':<16} {'距最优解':<14} {'记忆量'}")
    print("-" * 70)
    for opt in optimizers:
        final_loss = all_losses[opt.name][-1]
        final_dist = np.linalg.norm(all_trajectories[opt.name][-1] - landscape.optimum)
        memory = "0" if isinstance(opt, SGDOptimizer) else \
                 "m_t" if isinstance(opt, MomentumOptimizer) else \
                 "v_t" if isinstance(opt, RMSPropOptimizer) else \
                 "m_t, v_t"
        print(f"{opt.name:<12} {final_loss:<16.6e} {final_dist:<14.6f} {memory}")

    # 底部收敛步数(以损失 < 0.001 为标准)
    print(f"\n{'达到 L<0.001 所需步数:':<20}")
    for opt in optimizers:
        losses_arr = np.array(all_losses[opt.name])
        steps = np.argmax(losses_arr < 0.001) if np.any(losses_arr < 0.001) else "未达到"
        print(f"  {opt.name:<10}: {steps}")
    print("=" * 70)

    # ---- 5. 可视化 ----
    print("\n[可视化] 生成图表...")

    # 5a. 轨迹对比图
    plot_contour_comparison(landscape, all_trajectories)

    # 5b. 损失曲线
    plot_loss_curves(all_losses)

    # 5c. 超参数游乐场
    plot_hyperparameter_playground(landscape, theta_init)

    # ---- 6. 带噪声的 SGD 演示 ----
    print("\n[额外演示] Mini-batch 噪声对 SGD 的影响...")
    noisy_sgd = SGDOptimizer(lr=0.02)
    noisy_adam = AdamOptimizer(lr=0.1)

    traj_sgd_noisy, loss_sgd_noisy = run_optimizer(
        noisy_sgd, landscape, theta_init, n_steps=100,
        add_noise=True, noise_std=1.0
    )
    traj_adam_noisy, loss_adam_noisy = run_optimizer(
        noisy_adam, landscape, theta_init, n_steps=100,
        add_noise=True, noise_std=1.0
    )

    print(f"  SGD+噪声:   最终损失={loss_sgd_noisy[-1]:.4f}")
    print(f"  Adam+噪声:   最终损失={loss_adam_noisy[-1]:.4f}")
    print(f"  Adam 对梯度噪声的鲁棒性远优于 SGD!")

    # 噪声对比图
    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    ax.plot(loss_sgd_noisy, 'r-', linewidth=1.5, alpha=0.7, label='SGD + Noise')
    ax.plot(loss_adam_noisy, 'b-', linewidth=1.5, alpha=0.7, label='Adam + Noise')
    ax.set_xlabel('Iteration', fontsize=12)
    ax.set_ylabel('Loss', fontsize=12)
    ax.set_title('Robustness Comparison Under Gradient Noise (sigma=1.0)', fontsize=13, fontweight='bold')
    ax.set_yscale('log')
    ax.legend(fontsize=11)
    ax.grid(True, alpha=0.3)
    plt.tight_layout()
    out = os.path.join(_IMAGES, 'noise_robustness.png')
    plt.savefig(out, dpi=150, bbox_inches='tight')
    plt.close()
    print(f"[可视化] 噪声鲁棒性对比图已保存至 {out}")

    # ---- 7. 总结 ----
    print("\n" + "=" * 70)
    print("【总结】")
    print("=" * 70)
    print("  ✓ 在狭长峡谷形损失地形上对比了 4 种优化器")
    print("  ✓ SGD 沿陡峭方向震荡、平缓方向进展慢 → 锯齿路径")
    print("  ✓ Momentum 通过惯性平滑方向 → 路径更直")
    print("  ✓ RMSProp 自适应步长 → 陡峭方向步长变小")
    print("  ✓ Adam 结合两者 → 方向平滑 + 步长自适应 == 最快收敛")
    print("  ✓ Adam 对梯度噪声(mini-batch 噪声)的鲁棒性远优于 SGD")
    print("=" * 70)


if __name__ == "__main__":
    main()