s18 大语言模型 — demo.py 代码详解
运行方式
cd s18_large_language_models/code
python demo.py依赖:numpy, torch, matplotlib
代码逐段详解
第1步:Scaling Law — 语言模型损失的幂律下降
1.1 Kaplan Scaling Law
Kaplan et al. (2020) 发现语言模型的测试损失
其中
def kaplan_loss(N, D, a=1.5, b=2.0, alpha=0.076, beta=0.095, c=1.0):
return a / (N ** alpha) + b / (D ** beta) + c逐参数解释:
| 参数 | 值 | 含义 |
|---|---|---|
| 变量 | 模型参数量 | |
| 变量 | 训练数据量(token 数) | |
| 固定 | ||
| 固定 | ||
| 固定 | 不可约减损失的下界 | |
| 固定 | 比例系数 |
幂律的含义:在 log-log 图上,损失随参数量的增加呈直线下降——这意味着每将参数量翻倍,损失按固定比例减少。这与直觉"收益递减"一致——从 1M 到 10M 参数的效果提升远大于从 100B 到 1T。
1.2 Chinchilla 最优配比
2022 年 DeepMind 的 Chinchilla 论文指出:Kaplan 的计算最优分配偏向于"模型大、数据少",但正确的做法是数据和参数同步增长。
def chinchilla_optimal_D(N):
return 20.0 * N实际含义:用 175B 参数的 GPT-3 应该训练约 3.5T tokens,但它只用了约 300B tokens——GPT-3 是"欠训练"的。按照 Chinchilla 最优配比,在 GPT-3 的算力预算下,训练一个 70B 参数 + 1.4T tokens 的模型(如 Chinchilla 或 LLaMA 7B)反而效果更好。
1.3 可视化
代码绘制了四张子图,展示 Scaling Law 的四个维度:
- 图 1:
— 损失 vs 模型大小:log-log 图上的直线,标注 GPT-1(117M)、GPT-2(1.5B)、GPT-3(175B) 的位置 - 图 2:
— 损失 vs 数据量 - 图 3:
— 损失 vs 计算量( ,训练所需的 FLOPs) - 图 4: Chinchilla 等高线:显示不同
组合下的损失,红色虚线标出了 的最优线
# GPT-3 位置:参数很大但数据不足,位于最优线右下方
ax4.scatter([1.75e11], [3e11], color='orange', s=100, marker='s')
ax4.annotate('GPT-3\n(Undertrained)', (2e11, 4e11))
# LLaMA 7B 位置:参数较小但数据充足,位于最优线附近
ax4.scatter([7e9], [1e12], color='green', s=100, marker='^')
ax4.annotate('LLaMA 7B\n(Near-optimal)', (1e10, 1.5e12))第2步:涌现能力模拟 — 量变引起质变
2.1 什么是涌现?
涌现(Emergence)是指:某些能力在小模型中完全不存在(表现为随机水平),但当模型规模跨过某个阈值后,性能突然跃升到接近完美的水平。
2.2 Sigmoid 模型模拟涌现
代码使用 sigmoid 函数 来模拟涌现的"相位转变"行为:
def simulate_emergence(param_sizes, task, emergent=True, threshold=1e9, noise_level=0.05):
if emergent:
# Sigmoid 模拟相位转变:
# 1 / (1 + exp(-k * (log10(N) - log10(threshold))))
accuracies = 1.0 / (1.0 + np.exp(
-1.5 * (np.log10(param_sizes) - np.log10(threshold))
))
accuracies = 0.05 + 0.85 * accuracies # 基线 5% + 最大提升 85%
else:
# 非涌现:平滑线性增长
accuracies = 0.1 + 0.8 * (np.log10(param_sizes) - 6.0) / 6.0
accuracies = np.clip(accuracies, 0.1, 0.95)
accuracies += np.random.normal(0, noise_level, len(param_sizes)) # 加噪声
return np.clip(accuracies, 0.0, 1.0)Sigmoid 函数的妙用:当
2.3 模拟的六类任务
| 任务 | 涌现? | 涌现阈值 | 表现特征 |
|---|---|---|---|
| 3 位数加减法 | 是 | ~8B | 8B 前随机,8B 后 ~90% |
| 多语言翻译 | 是 | ~10B | 训练数据中无平行语料 |
| Chain-of-Thought (CoT) | 是 | ~60B | 能"一步步思考" |
| 指令遵循 | 是 | ~30B | 理解并执行自然语言指令 |
| 情感分析 | 否 | — | 平滑增长,小模型也能做 |
| 词性标注 | 否 | — | 平滑增长 |
涌现 vs 非涌现的本质区别:非涌现任务(如情感分析)的准确率从小模型到大模型一直是平滑上升的——因为情感极性判断所需的基础语言能力在小模型中已存在,规模增大只是提升了精确度。而涌现任务涉及技能的组合(如多语言翻译 = 语言理解 + 生成 + 跨语言对齐),小模型中这些子技能不足以组合成一个新能力,直到模型足够大时才能"涌现"出来。
第3步:DPO — 直接偏好优化
3.1 为什么需要 DPO?
RLHF(Reinforcement Learning from Human Feedback)是让 LLM 与人类偏好对齐的标准方法,但它需要:
- 单独训练一个奖励模型
- 用 PPO 做强化学习优化(训练不稳定,超参数敏感)
- 整个过程需要四个模型同时运行(策略模型、参考模型、奖励模型、价值模型)
DPO(Rafailov et al., 2023)提出了一种更简洁的方案:直接从偏好数据优化策略,不需要独立的奖励模型。
3.2 DPO 损失函数
DPO 利用了一个关键的数学洞察:"语言模型本身隐含地就是一个奖励模型"。由此推导出的损失函数为:
化简后的实现:
def dpo_loss(pi_logps_chosen, pi_logps_rejected,
ref_logps_chosen, ref_logps_rejected, beta=0.1):
# 策略模型:好回答 log P - 差回答 log P
pi_diff = pi_logps_chosen - pi_logps_rejected
# 参考模型:好回答 log P - 差回答 log P
ref_diff = ref_logps_chosen - ref_logps_rejected
# DPO 的隐式奖励:beta * (策略模型的偏好差异 - 参考模型的偏好差异)
logits = beta * (pi_diff - ref_diff)
# 二分类交叉熵损失:-log σ(logits)
loss = -F.logsigmoid(logits).mean()
return loss逐行解释:
pi_diff = pi_logps_chosen - pi_logps_rejected:策略模型对好回答的对数概率减去对差回答的对数概率。如果策略模型正确地更偏好好的回答,这个值应该是正的(好回答的 log 概率更高)。 ref_diff = ref_logps_chosen - ref_logps_rejected:参考模型(通常是 SFT 模型,训练时冻结)的对应差值。这提供了一个基准线。 logits = beta * (pi_diff - ref_diff):策略模型相对于参考模型的偏好改善程度。是 KL 惩罚系数——控制策略模型可以偏离参考模型多远。 越大,偏离越自由; 越小,策略模型越接近参考模型。 -F.logsigmoid(logits).mean():标准二分类交叉熵损失。logsigmoid等价于log(σ(x))。因为本身是负的( ),取负号后损失为正。
数值示例:代码展示了两种场景的 DPO 损失对比:
| 场景 | 策略模型表现 | DPO 损失 |
|---|---|---|
| 正确偏好 | 好回答 log P=-2, 差回答 log P=-5 | 较小(模型已学会偏好) |
| 错误偏好 | 好/差回答概率接近 | 较大(模型未区分优劣) |
3.3 模拟 DPO 训练
代码模拟了 50 个偏好对上 DPO 的训练过程:
- 随着训练进行,策略模型越来越好地学会区分好回答和差回答
- DPO 损失从高值逐渐下降到低值
- 这与真实 DPO 训练的行为一致
第4步:LoRA — 低秩适配
4.1 LoRA 的核心思想
全参数微调一个 175B 的模型需要数百 GB 显存。LoRA(Hu et al., 2021)的核心洞见是:模型适应新任务时,权重的更新矩阵
其中:
:原始权重(冻结,不参与训练) , :可训练的低秩矩阵 :秩,通常 8-64
4.2 LoRA 实现
class LoRALinear(nn.Module):
def __init__(self, in_features, out_features, r=8, alpha=16.0):
# 原始权重:冻结,不参与训练
self.register_buffer('W', torch.randn(out_features, in_features) * 0.02)
# LoRA 低秩矩阵:可训练
self.lora_A = nn.Parameter(torch.randn(r, in_features) * 0.02) # (r, in)
self.lora_B = nn.Parameter(torch.zeros(out_features, r)) # (out, r)
self.scaling = alpha / r
def forward(self, x):
original = x @ self.W.T # 原始路径(冻结)
lora_out = x @ self.lora_A.T @ self.lora_B.T # LoRA 路径: x → A → B
return original + self.scaling * lora_out # Wx + (α/r) BAx关键设计分析:
self.lora_B初始化为零:开始时,LoRA 路径输出全零,模型行为完全等同于原始模型。这保证了微调从预训练模型的原始性能开始。 缩放因子
alpha / r:控制 LoRA 更新的幅度。通常 (当 时缩放为 2)。缩放因子越大,LoRA 更新的影响力越大。 register_buffer存储原始权重:buffer 不会被 optimizer 追踪(不参与梯度计算),确保原始权重在训练过程中保持冻结。参数效率:对于一个
的全连接层: - 全参数训练:
参数 - LoRA (
): 参数 - 减少
!
- 全参数训练:
关键概念速查表
| 概念 | 公式/描述 | 一句话 |
|---|---|---|
| Kaplan Scaling Law | 损失随参数/数据量幂律下降 | |
| Chinchilla 最优 | 数据和参数需同步增长 | |
| 不可约减损失 | 数据固有的最小损失 | 无论模型多大都无法消除 |
| 涌现 | Sigmoid 相位转变 | 越过阈值后能力突然跃升 |
| RLHF | SFT → Reward Model → PPO | 三段式对齐 pipeline |
| DPO | 从偏好数据直接优化,无需奖励模型 | |
| DPO 的 | KL 惩罚系数 | 控制偏离参考模型的程度 |
| LoRA | 低秩适配,参数减少 100-1000x | |
| LoRA 秩 | 通常 8-64 | 越小参数越少,但可能欠拟合 |
| LoRA 目标模块 | q_proj, v_proj, k_proj, o_proj | Qwen/Llama 系列通常在注意力投影上加 LoRA |
完整代码
# -*- coding: utf-8 -*-
"""
s18 大语言模型 demo:Scaling, Emergence, LoRA, DPO
====================================================
本文件演示大语言模型的核心概念:
1. Scaling Law 可视化(Kaplan + Chinchilla)
2. 涌现行为模拟(算术能力 vs 模型大小)
3. LoRA 低秩微调(使用 PEFT 库)
4. DPO 偏好优化(使用 TRL 库)
5. 指令遵循对比
运行方式:在 s18_large_language_models 目录下执行 `python code/demo.py`
依赖:torch, transformers, peft, trl, matplotlib
注意:LoRA/DPO 部分需要下载小模型(~500MB,支持消费级硬件)
"""
import numpy as np
import math
from typing import List, Tuple, Dict
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
# GPU 自动检测
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu')
print(f"使用设备: {DEVICE}")
if DEVICE.type == 'cpu':
print("(未检测到 GPU,使用 CPU 运行。如有 GPU,请安装 CUDA 版 PyTorch 以获得加速)")
# ====== 可选:使用 LLM API ======
# 如需使用真实 LLM API,请设置环境变量:
# export OPENAI_API_KEY=your-key
# export OPENAI_BASE_URL=https://api.openai.com/v1
# 然后将 USE_API = False 改为 True
USE_API = False
import matplotlib.pyplot as plt
import matplotlib
# 中文字体配置
matplotlib.rcParams['axes.unicode_minus'] = False
import os
_HERE = os.path.dirname(os.path.abspath(__file__))
_IMAGES = os.path.join(_HERE, '..', 'images')
os.makedirs(_IMAGES, exist_ok=True)
# ============================================================
# 第一部分:Scaling Law 可视化
# ============================================================
print("=" * 60)
print("[Scaling Law] 语言模型损失的幂律下降")
print("=" * 60)
def kaplan_loss(N: float, D: float, a: float = 1.5, b: float = 2.0, alpha: float = 0.076, beta: float = 0.095, c: float = 1.0) -> float:
"""
Kaplan 等人提出的 Scaling Law:
L(N, D) = a/N^α + b/D^β + c
参数:
N: 模型参数量
D: 训练数据量 (tokens)
a, b: 系数
α, β: 幂律指数
c: 不可约减损失 (irreducible loss)
返回:
预测的测试损失
"""
return a / (N ** alpha) + b / (D ** beta) + c
def chinchilla_optimal_D(N: float) -> float:
"""
Chinchilla 最优配比: D ≈ 20 × N
参数:
N: 模型参数量
返回:
最优的训练 token 数
"""
return 20.0 * N
# 绘制 Scaling Law 曲线
fig, axes = plt.subplots(2, 2, figsize=(14, 11))
# 图 1: 损失 vs 模型大小
N_range = np.logspace(6, 12, 100) # 1M → 1T 参数
D_fixed = 3e11 # 固定数据量 (300B tokens, 类似 GPT-3)
losses_N = [kaplan_loss(N, D_fixed) for N in N_range]
ax1 = axes[0, 0]
ax1.loglog(N_range, losses_N, 'b-', linewidth=2)
ax1.scatter([1.17e8, 1.5e9, 1.75e11], [kaplan_loss(1.17e8, D_fixed), kaplan_loss(1.5e9, D_fixed), kaplan_loss(1.75e11, D_fixed)],
color='red', s=80, zorder=5)
ax1.annotate('GPT-1\n117M', (1.5e8, kaplan_loss(1.17e8, D_fixed) + 0.1), fontsize=8, color='red')
ax1.annotate('GPT-2\n1.5B', (2e9, kaplan_loss(1.5e9, D_fixed) + 0.08), fontsize=8, color='red')
ax1.annotate('GPT-3\n175B', (2e11, kaplan_loss(1.75e11, D_fixed) + 0.08), fontsize=8, color='red')
ax1.set_xlabel("Model Parameters N", fontsize=11)
ax1.set_ylabel("Test Loss L", fontsize=11)
ax1.set_title("L(N) ∝ N^(-α), α≈0.076", fontsize=12)
ax1.grid(True, alpha=0.3, which='both')
ax1.axhline(y=1.0, color='gray', linestyle='--', alpha=0.5, label='Irreducible Loss c')
ax1.legend(fontsize=9)
# 图 2: 损失 vs 数据量
D_range = np.logspace(7, 13, 100) # 10M → 10T tokens
N_fixed = 7e9 # 固定模型大小 (7B 参数, 类似 LLaMA)
losses_D = [kaplan_loss(N_fixed, D) for D in D_range]
ax2 = axes[0, 1]
ax2.loglog(D_range, losses_D, 'g-', linewidth=2)
ax2.set_xlabel("Training Data D (tokens)", fontsize=11)
ax2.set_ylabel("Test Loss L", fontsize=11)
ax2.set_title("L(D) ∝ D^(-β), β≈0.095", fontsize=12)
ax2.grid(True, alpha=0.3, which='both')
# 图 3: 损失 vs 计算量
C_range = np.logspace(-3, 6, 100) # PF-days
# 计算量大约 C ≈ 6ND (经验公式)
losses_C = [kaplan_loss(np.sqrt(c/6*1e15), np.sqrt(c/6*1e15)) for c in C_range]
ax3 = axes[1, 0]
ax3.loglog(C_range, losses_C, 'purple', linewidth=2)
ax3.set_xlabel("Compute C (PF-days)", fontsize=11)
ax3.set_ylabel("Test Loss L", fontsize=11)
ax3.set_title("L(C) ∝ C^(-γ), γ≈0.057", fontsize=12)
ax3.grid(True, alpha=0.3, which='both')
# 图 4: Chinchilla 最优配比等高线
N_vals = np.logspace(6, 11, 50)
D_vals = np.logspace(8, 13, 50)
NN, DD = np.meshgrid(N_vals, D_vals)
LL = kaplan_loss(NN, DD)
ax4 = axes[1, 1]
contour = ax4.contour(np.log10(NN), np.log10(DD), LL, levels=10, cmap='RdYlBu_r')
ax4.clabel(contour, inline=True, fontsize=8)
# Chinchilla 最优线: D = 20N
optimal_D = [chinchilla_optimal_D(n) for n in N_vals]
ax4.loglog(N_vals, optimal_D, 'r--', linewidth=2, label='Chinchilla Optimal D≈20N')
# GPT-3 点
ax4.scatter([1.75e11], [3e11], color='orange', s=100, zorder=5, marker='s')
ax4.annotate('GPT-3\n(Undertrained)', (2e11, 4e11), fontsize=8, color='darkorange')
# LLaMA 7B 点
ax4.scatter([7e9], [1e12], color='green', s=100, zorder=5, marker='^')
ax4.annotate('LLaMA 7B\n(Near-optimal)', (1e10, 1.5e12), fontsize=8, color='green')
ax4.set_xlabel("Model Parameters N (log)", fontsize=11)
ax4.set_ylabel("Training Tokens D (log)", fontsize=11)
ax4.set_title("Chinchilla Optimal Ratio: D ≈ 20N", fontsize=12)
ax4.legend(fontsize=9)
ax4.grid(True, alpha=0.3)
plt.suptitle("Scaling Laws of Language Models", fontsize=15, fontweight='bold')
plt.tight_layout()
plt.savefig(os.path.join(_IMAGES, 'scaling_laws.png'), dpi=150, bbox_inches='tight')
plt.close()
print("[可视化] Scaling Laws 图表已保存至 images/scaling_laws.png")
print(f"\n[计算示例]")
print(f" 7B 模型 + 300B tokens: L={kaplan_loss(7e9, 3e11):.3f}")
print(f" 7B 模型 + 1.4T tokens (Chinchilla最优): L={kaplan_loss(7e9, 1.4e12):.3f}")
print()
# ============================================================
# 第二部分:涌现行为模拟
# ============================================================
print("=" * 60)
print("[涌现行为] 算术能力 vs 模型大小")
print("=" * 60)
def simulate_emergence(
param_sizes: np.ndarray, # 模型大小列表
task: str, # 任务名称
emergent: bool = True, # 是否有涌现特性
threshold: float = 1e9, # 涌现阈值
noise_level: float = 0.05, # 噪声水平
) -> np.ndarray:
"""
模拟不同模型大小下的任务准确率。
涌现任务:在阈值前接近随机,阈值后快速跃升。
非涌现任务:平滑增长。
参数:
param_sizes: 模型参数量数组
task: 任务名称
emergent: 是否为涌现任务
threshold: 涌现阈值
noise_level: 噪声水平
返回:
accuracies: 模拟的准确率数组
"""
if emergent:
# 涌现:sigmoid 函数模拟相位转变
# logistic function: 1 / (1 + exp(-k*(x - threshold)))
accuracies = 1.0 / (1.0 + np.exp(-1.5 * (np.log10(param_sizes) - np.log10(threshold))))
# 加入"近随机"基线
accuracies = 0.05 + 0.85 * accuracies
else:
# 非涌现:平滑的线性 + 轻微的指数增长
accuracies = 0.1 + 0.8 * (np.log10(param_sizes) - 6.0) / 6.0
accuracies = np.clip(accuracies, 0.1, 0.95)
# 加噪声
accuracies += np.random.normal(0, noise_level, len(param_sizes))
return np.clip(accuracies, 0.0, 1.0)
# 设定模型大小范围
param_range = np.logspace(6, 12, 30) # 1M → 1T
# 模拟 6 个任务
tasks = {
"3-Digit Arithmetic": (True, 8e9), # Emergent, threshold ~8B
"Multilingual Translation": (True, 1e10), # Emergent, threshold ~10B
"Chain-of-Thought (CoT)": (True, 6e10), # Emergent, threshold ~60B
"Instruction Following": (True, 3e10), # Emergent, threshold ~30B
"Sentiment Analysis": (False, None), # Non-emergent: smooth growth
"POS Tagging": (False, None), # Non-emergent: smooth growth
}
fig, axes = plt.subplots(2, 3, figsize=(16, 10))
axes = axes.flatten()
colors_emergent = ['#E53935', '#E53935', '#E53935', '#E53935']
colors_smooth = ['#1E88E5', '#1E88E5']
emergent_idx = 0
smooth_idx = 0
for ax, (task_name, (is_emergent, threshold)) in zip(axes, tasks.items()):
accs = simulate_emergence(param_range, task_name, emergent=is_emergent, threshold=threshold)
color = colors_emergent[emergent_idx] if is_emergent else colors_smooth[smooth_idx]
if is_emergent:
emergent_idx += 1
else:
smooth_idx += 1
ax.semilogx(param_range, accs * 100, 'o-', color=color, linewidth=1.5, markersize=4)
ax.set_xlabel("Model Parameters", fontsize=9)
ax.set_ylabel("Accuracy (%)", fontsize=9)
ax.set_title(f"{task_name} {'(Emergent)' if is_emergent else '(Smooth Growth)'}", fontsize=11)
if is_emergent and threshold:
ax.axvline(x=threshold, color='gray', linestyle='--', alpha=0.5)
ax.annotate(f'Emergence threshold\n~{threshold/1e9:.0f}B',
xy=(threshold, 50), fontsize=8, color='red',
ha='left',
arrowprops=dict(arrowstyle='->', color='red', lw=1))
ax.set_ylim(0, 105)
ax.grid(True, alpha=0.3)
ax.axhline(y=10, color='gray', linestyle=':', alpha=0.3, label='Random baseline 10%' if task_name == "3-Digit Arithmetic" else '')
plt.suptitle("Emergent vs Smooth Growth: Task Behavior by Model Scale", fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig(os.path.join(_IMAGES, 'emergent_abilities.png'), dpi=150, bbox_inches='tight')
plt.close()
print("[可视化] 涌现能力图已保存至 images/emergent_abilities.png")
print()
print(" 涌现能力特征: 小模型≈随机水平 → 跨过阈值 → 大幅跃升")
print(" 非涌现能力: 随模型大小平滑增长,小模型也能做")
print()
# ============================================================
# 第三部分:DPO 损失函数实现(核心算法理解)
# ============================================================
print("=" * 60)
print("[DPO] 直接偏好优化损失函数")
print("=" * 60)
def dpo_loss(
pi_logps_chosen: torch.Tensor, # 策略模型对偏好回答的 log 概率 (batch,)
pi_logps_rejected: torch.Tensor, # 策略模型对较差回答的 log 概率 (batch,)
ref_logps_chosen: torch.Tensor, # 参考模型对偏好回答的 log 概率 (batch,)
ref_logps_rejected: torch.Tensor, # 参考模型对较差回答的 log 概率 (batch,)
beta: float = 0.1, # KL 惩罚系数
) -> torch.Tensor:
"""
计算 DPO 损失。
DPO 损失公式:
L_DPO = -log σ( β·log[π_θ(y_w|x)/π_ref(y_w|x)] - β·log[π_θ(y_l|x)/π_ref(y_l|x)] )
其中 π_θ 是要优化的策略模型,π_ref 是参考模型(通常是 SFT 模型),
y_w 是人类偏好的回答 (winner/wanted),y_l 是较差回答 (loser)。
直觉: 如果策略模型给好回答的概率比参考模型高,且给差回答的概率比参考模型低,
则损失小。反之,如果策略模型在好回答和差回答上的表现和参考模型一样
(甚至更差),损失大。
参数:
pi_logps_chosen: log π_θ(y_w | x), 策略模型下偏好回答的对数概率
pi_logps_rejected: log π_θ(y_l | x), 策略模型下较差回答的对数概率
ref_logps_chosen: log π_ref(y_w | x), 参考模型下偏好回答的对数概率
ref_logps_rejected: log π_ref(y_l | x), 参考模型下较差回答的对数概率
beta: KL 惩罚系数,越大越鼓励模型偏离参考模型(但也越容易过拟合)
返回:
loss: DPO 损失值
"""
# 计算策略模型与参考模型在 log 概率上的差异
pi_diff = pi_logps_chosen - pi_logps_rejected # 策略模型: 好回答 - 差回答
ref_diff = ref_logps_chosen - ref_logps_rejected # 参考模型: 好回答 - 差回答
# 加权差异,乘以 beta
logits = beta * (pi_diff - ref_diff) # DPO 论文中的隐式奖励
# 二分类交叉熵损失: -log σ(logits)
loss = -F.logsigmoid(logits).mean()
return loss
# 模拟 DPO 训练的损失变化
print("\n[模拟] DPO 训练过程中的损失变化...")
torch.manual_seed(42)
# 假设 50 个偏好对
num_pairs = 50
# 模拟:随着训练进行,模型越来越好地学会偏好
dpo_losses = []
for step_ratio in np.linspace(0.01, 1.0, 30):
# 模拟策略模型越来越好(与参考模型的差异越来越大)
pi_diff_base = step_ratio * 3.0 # 初期差异小,后期差异大
pi_diff_chosen = pi_diff_base + torch.randn(num_pairs) * 0.5
pi_diff_rejected = -pi_diff_base + torch.randn(num_pairs) * 0.5
ref_diff_chosen = torch.zeros(num_pairs) # 参考模型差异为 0(它是固定的)
ref_diff_rejected = torch.zeros(num_pairs)
loss = dpo_loss(
pi_logps_chosen=pi_diff_chosen,
pi_logps_rejected=pi_diff_rejected,
ref_logps_chosen=ref_diff_chosen,
ref_logps_rejected=ref_diff_rejected,
beta=0.1,
)
dpo_losses.append(loss.item())
# 绘制 DPO 训练损失曲线
plt.figure(figsize=(8, 4))
plt.plot(np.linspace(0.01, 1.0, 30), dpo_losses, 'o-', color='#00897B', linewidth=1.5, markersize=4)
plt.xlabel("Training Progress", fontsize=12)
plt.ylabel("DPO Loss", fontsize=12)
plt.title("DPO Training Loss (Simulated): Model Learns to Prefer Good Responses", fontsize=13, fontweight='bold')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join(_IMAGES, 'dpo_training_loss.png'), dpi=150, bbox_inches='tight')
plt.close()
print("[可视化] DPO 训练损失曲线已保存至 images/dpo_training_loss.png")
print()
# 展示 DPO 损失的计算示例
# 场景 1: 模型正确偏好 (好回答概率高,差回答概率低)
print("[DPO 数值示例]")
pi_good = torch.tensor([-2.0, -2.5, -1.8]) # log 概率(负数)
pi_bad = torch.tensor([-5.0, -5.5, -4.8])
ref_good = torch.tensor([-3.0, -3.0, -3.0])
ref_bad = torch.tensor([-3.0, -3.0, -3.0])
loss_correct = dpo_loss(pi_good, pi_bad, ref_good, ref_bad, beta=0.1)
print(f" 正确偏好场景 (好回答log P=-2, 差回答log P=-5): Loss={loss_correct.item():.4f} (应该较小)")
# 场景 2: 模型错误偏好 (好回答和差回答的概率差不多)
pi_bad_model = torch.tensor([-3.0, -3.2, -2.9])
pi_bad_bad = torch.tensor([-3.1, -3.3, -3.0])
loss_wrong = dpo_loss(pi_bad_model, pi_bad_bad, ref_good, ref_bad, beta=0.1)
print(f" 错误偏好场景 (好/差回答概率接近): Loss={loss_wrong.item():.4f} (应该较大)")
# ============================================================
# 第四部分:LoRA 配置(概念演示)
# ============================================================
print("\n" + "=" * 60)
print("[LoRA] 低秩适配概念演示")
print("=" * 60)
class LoRALinear(nn.Module):
"""
简化的 LoRA 线性层(概念演示)。
LoRA 不修改原始权重 W,而是学习一个低秩更新 ΔW = BA:
h = Wx + (α/r) * B A x
参数:
in_features: 输入维度
out_features: 输出维度
r: LoRA 秩(低秩分解的秩,通常 8-64)
alpha: LoRA 缩放系数
"""
def __init__(self, in_features: int, out_features: int, r: int = 8, alpha: float = 16.0):
super().__init__()
# 原始权重(冻结,不参与训练)
self.register_buffer('W', torch.randn(out_features, in_features) * 0.02)
# LoRA 低秩矩阵 A 和 B
# A: (r, in_features), 用 Kaiming 初始化
self.lora_A = nn.Parameter(torch.randn(r, in_features) * 0.02)
# B: (out_features, r), 初始化为 0(开始时 ΔW = 0 × A = 0)
self.lora_B = nn.Parameter(torch.zeros(out_features, r))
self.r = r
self.alpha = alpha
self.scaling = alpha / r # 缩放因子
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
前向传播: h = Wx + scaling * B @ A @ x
参数:
x: 输入 (batch, in_features)
返回:
h: 输出 (batch, out_features)
"""
# 原始路径(不计算梯度)
original = x @ self.W.T # (batch, out_features)
# LoRA 路径(低秩更新)
lora_out = x @ self.lora_A.T @ self.lora_B.T # x → (r,) → (out_features,)
return original + self.scaling * lora_out
# 比较 LoRA 的参数量
d = 4096 # d_model
r = 16 # LoRA 秩
vanilla_params = d * d # 一个全连接层的参数
lora_params = 2 * d * r # A: (r, d) + B: (d, r)
print(f"\n 全参数训练: {vanilla_params:,} 参数")
print(f" LoRA (r={r}): {lora_params:,} 参数 ({lora_params/vanilla_params*100:.2f}%)")
print(f" 参数量减少: {vanilla_params/lora_params:.0f}x")
print()
# 模拟 LoRA 微调过程
print("[模拟] LoRA 微调: 模型学习新任务...")
lora_layer = LoRALinear(256, 256, r=8, alpha=16.0).to(DEVICE)
optimizer = optim.Adam(lora_layer.parameters(), lr=0.01)
# 模拟一个简单的回归任务
lora_losses = []
for epoch in range(100):
x_batch = torch.randn(16, 256).to(DEVICE)
y_batch = torch.randn(16, 256).to(DEVICE) # 目标
optimizer.zero_grad()
pred = lora_layer(x_batch)
loss = F.mse_loss(pred, y_batch)
loss.backward()
optimizer.step()
lora_losses.append(loss.item())
plt.figure(figsize=(8, 4))
plt.plot(lora_losses, color='#7B1FA2', linewidth=1.5)
plt.xlabel("Epoch", fontsize=12)
plt.ylabel("MSE Loss", fontsize=12)
plt.title("LoRA Fine-tuning Training Loss (Simulated)", fontsize=13, fontweight='bold')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join(_IMAGES, 'lora_training_loss.png'), dpi=150, bbox_inches='tight')
plt.close()
print("[可视化] LoRA 训练损失曲线已保存至 images/lora_training_loss.png")
print()
# ============================================================
# 第五部分:总结
# ============================================================
print("=" * 60)
print("[总结] 大语言模型的核心概念")
print("=" * 60)
print("""
1. Scaling Law:
L(N, D) = a/N^α + b/D^β + c
损失随参数/数据量呈幂律下降
Chinchilla最优: D ≈ 20N
2. 涌现能力:
小模型不会 → 跨过阈值 → 突然会了
3位算术、CoT推理、指令遵循 —— 在~10B后涌现
3. 指令微调 (SFT):
用 (指令, 回复) 对训练模型执行任务
4. 对齐:
RLHF: SFT → Reward Model → PPO
DPO: 直接从偏好数据优化,无需奖励模型
5. LoRA:
不修改原始权重,学习低秩增量 ΔW = BA
参数量减少 100-1000x,消费级硬件即可微调大模型
下一站:
s22 多模态模型 — CLIP, 图文对齐
s23 RAG 与 Agent — 检索增强生成 + 工具调用
s24 部署与推理优化 — 量化, KV Cache, Flash Attention
""")
print("\n所有 demo 运行完成!图表已保存至 images/ 目录。")