Skip to content

s14 文本表示 — demo.py 代码详解

Download demo.py

运行方式

bash
cd s14_text_representation/code
python demo.py

代码逐段详解

第1步:导入库 —— 每个库是做什么的

python
import numpy as np                     # TF-IDF 矩阵、向量化计算
import math                            # log 运算(IDF 公式需要)
from collections import Counter        # 词频统计

import torch
import torch.nn as nn                  # Embedding 层(word2vec 输入/输出矩阵)
import torch.nn.functional as F        # logsigmoid(负采样损失)
import torch.optim as optim            # Adam 优化器
from torch.utils.data import Dataset, DataLoader  # Mini-batch 训练

import matplotlib.pyplot as plt        # TF-IDF 热力图、t-SNE 可视化
from sklearn.manifold import TSNE      # 高维词向量 → 2D 可视化
在此 demo 中的角色
numpyTF-IDF 矩阵存储和运算
collections.Counter词频统计 {"足球": 3, "比赛": 2, ...}
torch.nn.Embeddingword2vec 的输入嵌入矩阵 W 和输出嵌入矩阵 W
torch.nn.functional.logsigmoid负采样损失中的 logσ()
sklearn.manifold.TSNE高维词向量降维到 2D 可视化

第2步:中文语料库 —— 25 篇文档,5 个主题

python
CORPUS = [
    # 体育类(5篇)
    "足球比赛在工人体育场举行 观众热情高涨",
    "篮球运动员在训练中表现出色 投篮命中率很高",
    ...
    # 科技类(5篇)
    "人工智能技术正在改变各行各业的运作方式",
    ...
    # 教育类(5篇)、经济类(5篇)、医疗类(5篇)
]

为什么用中文? 对于中文读者,中文语料更直观,可以看到 TF-IDF 如何在中文文本上提取关键词(每个汉字都被视为一个 token)。实际项目中应使用 jieba 等分词工具,此处使用逐字切分是为了简化,避免依赖额外库。

分词函数

python
def tokenize(text: str) -> List[str]:
    text = text.replace(" ", "")  # 去空格
    return list(text)             # 逐字切分

第3步:TF-IDF —— 从零实现统计文本表示

TF-IDF 由两个独立的统计量相乘得到。

3.1 TF(词频):一个词在一篇文档中出现的频率

TF(w,d)=c(w,d)wc(w,d)
python
def compute_tf(doc_tokens):
    counter = Counter(doc_tokens)  # {"足球": 1, "比赛": 1, ...}
    total = len(doc_tokens)
    return {word: count / total for word, count in counter.items()}

为什么用相对频率而不是绝对频次? 长文档中所有词的绝对频次都更高。如果不做归一化,长文档的 TF-IDF 向量在数值上会"淹没"短文档,导致文档相似度被文档长度主导而非内容。

3.2 IDF(逆文档频率):一个词在整个语料库中的"稀有程度"

IDF(w)=logNdf(w)

其中 N 是总文档数,df(w) 是包含词 w 的文档数。

python
def compute_idf(tokenized_docs):
    N = len(tokenized_docs)
    idf = {}
    for doc in tokenized_docs:
        for word in set(doc):  # 每篇文档中每个词只计一次!
            idf[word] = idf.get(word, 0) + 1  # df(w)

    for word in idf:
        idf[word] = math.log((N + 1) / (idf[word] + 1)) + 1  # 平滑版公式
    return idf

为什么用 set(doc) IDF 关心的是包含该词的文档数量,而不是在文档中出现了几次。df(w) 是文档频率(Document Frequency),不是词频(Term Frequency)。

平滑的作用

  • 原始公式:log(N/df(w))
  • 平滑公式:log((N+1)/(df(w)+1))+1

平滑避免了当 df(w)=N(词在所有文档中出现)时 log(1)=0 导致的零权重。平滑后即使"万能词"也有一个小正值。

IDF 的含义

出现在几篇文档IDF(约)解释
"的"25 篇(全部)1高频但无信息量,IDF 低
"足球"1 篇3稀有词,IDF 高,区分力强

3.3 TF-IDF 矩阵构建

python
# 构建 (N_docs × V) 的 TF-IDF 矩阵
tfidf_matrix = np.zeros((len(CORPUS), V))
for doc_idx, doc_tokens in enumerate(tokenized_corpus):
    tf = compute_tf(doc_tokens)
    for word, tf_val in tf.items():
        word_idx = word_to_idx[word]
        tfidf_matrix[doc_idx, word_idx] = tf_val * idf_scores[word]

矩阵解读

  • 行 = 文档(25 行),列 = 词汇(~250 列)
  • tfidf_matrix[3, 15] = 文档 3 中词汇 15 的 TF-IDF 得分
  • 大部分元素为 0(每篇文档只包含少量词)——典型的高维稀疏矩阵

TF-IDF 关键词提取:对每篇文档取 TF-IDF 得分最高的 3 个词,这些词就是该文档的"关键词":

python
for doc_idx, doc in enumerate(CORPUS):
    doc_vec = tfidf_matrix[doc_idx]
    top_indices = np.argsort(doc_vec)[::-1][:3]  # 降序取前3
    top_words = [idx_to_word[i] for i in top_indices]

第4步:word2vec Skip-gram + 负采样 —— 从零实现

4.1 Skip-gram 训练数据构建

Skip-gram 的核心思想:给定中心词 wt,预测它周围的上下文词。

窗口大小为 2 的示例:
句子:  "今天 天气 非常 好"
窗口:  [中心=天气, 上下文={今天, 非常, 好}]  →  3 个训练对
python
def build_skipgram_pairs(tokenized_docs, window_size=2):
    pairs = []
    for doc in tokenized_docs:
        indices = [word_to_idx[w] for w in doc]
        for i, center in enumerate(indices):
            # 遍历窗口内的上下文词
            for j in range(max(0, i - window_size),
                           min(len(indices), i + window_size + 1)):
                if i != j:  # 不包含中心词自身
                    pairs.append((center, indices[j]))
    return pairs

为什么 Skip-gram 对罕见词效果好? CBOW 将上下文词向量取平均后预测中心词,这个平均操作会"抹平"罕见词的独特信息。Skip-gram 直接用中心词预测每个上下文词,每个词对都独立处理,罕见词的表示不会被平均化。

4.2 负采样 —— 让训练变得可行

为什么需要负采样? Skip-gram 的输出层是一个大小为 V 的 softmax。词汇表 V 可能有几万到几十万——每一步训练都要做 V-way 的 softmax,计算量巨大。

负采样的巧思:把"V 分类"变成"K+1 个二分类"(通常 K=5)。

  • 正样本:(wt,wc) —— wcwt 的真实上下文
  • 负样本:(wt,wrand) —— 随机采样的 K 个词

负采样分布:使用词频的 3/4 次方作为采样概率:

Pn(w)=freq(w)0.75wfreq(w)0.75
python
word_freq_pow = {w: count ** 0.75 for w, count in word_freq.items()}

为什么是 3/4 次方? 纯词频分布下,高频词(如"的"、"了")几乎包揽所有负采样,模型学会了"高频词不是上下文"的偏见。3/4 次方降低了高频词的过度优势,提高了中低频词的采样率,更均衡的负样本分布产生更好的词向量。

4.3 Skip-gram 模型

python
class SkipGramNegSampling(nn.Module):
    def __init__(self, vocab_size, embed_dim=100):
        # 输入嵌入矩阵 W (V × d) —— 训练后保留为词向量!
        self.in_embeddings = nn.Embedding(vocab_size, embed_dim)
        # 输出嵌入矩阵 W' (V × d) —— 辅助矩阵,训练后可丢弃
        self.out_embeddings = nn.Embedding(vocab_size, embed_dim)

两个嵌入矩阵的区别

矩阵形状含义作用
in_embeddingsV×d每行是词的输入向量 vw训练后保留这个作为词向量
out_embeddingsV×d每行是词的输出向量 uw辅助训练,通常丢弃

为什么保留 in_embeddings 而不是两者的平均? 两种做法都有人用。保留输入向量是最常见的做法(Gensim 的 word2vec 即如此)。在 GloVe 中会将输入和输出向量求和。

负采样损失函数

L=logσ(vwtuwc)i=1Klogσ(vwtuwi)
python
def forward(self, center_words, context_words, neg_words):
    v_center = self.in_embeddings(center_words)   # (batch, d)
    u_pos = self.out_embeddings(context_words)     # (batch, d)
    u_neg = self.out_embeddings(neg_words)         # (batch, K, d)

    # 正样本:希望 v_center · u_pos 很大 → sigmoid 接近 1 → log loss 小
    pos_score = torch.sum(v_center * u_pos, dim=1)  # (batch,)
    pos_loss = F.logsigmoid(pos_score).mean()       # -log σ(v·u_pos)

    # 负样本:希望 v_center · u_neg 很小 → sigmoid(-score) 接近 1 → log loss 小
    neg_score = torch.bmm(u_neg, v_center.unsqueeze(2)).squeeze(2)  # (batch, K)
    neg_loss = F.logsigmoid(-neg_score).sum(dim=1).mean()           # -log σ(-v·u_neg)

    return -(pos_loss + neg_loss)  # 总的负采样损失

损失函数的直觉

  • 第一项 F.logsigmoid(pos_score):正样本得分高 sigmoid 值大 log 值接近 0 损失小。即:鼓励中心词和正确的上下文词相似
  • 第二项 F.logsigmoid(-neg_score):负样本得分低 sigmoid 值大 log 值接近 0 损失小。即:鼓励中心词和随机的非上下文词不相似

第5步:词向量分析

5.1 t-SNE 降维可视化

python
# 选择频率最高的 100 个词
top_words = [w for w, _ in word_counts.most_common(100)]

# t-SNE 降维到 2D
tsne = TSNE(n_components=2, perplexity=30)
vectors_2d = tsne.fit_transform(top_vectors)

为什么用 t-SNE 而不是 PCA? PCA 是线性降维,只保留全局方差最大的方向。t-SNE 是非线性降维,专注于保持局部邻域结构——对词向量来说,我们关心的是"哪些词聚在一起",这正是 t-SNE 的强项。perplexity=30 是平衡局部和全局结构的常用值。

预期结果:"球"和"篮"(体育类)、"学"和"习"(教育类)在 t-SNE 投影中应该靠得很近。

5.2 近义词查询(余弦相似度)

python
def cosine_similarity(v1, v2):
    return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))

为什么用余弦相似度而不是欧氏距离? 词向量的长度(幅度)通常受词频影响——高频词向量更长。余弦相似度只关心方向(角度),消除了词频对相似度判断的干扰。在 NLP 中,cos(v足球,v篮球)0.8v足球v篮球0.5 更有意义。

cos(a,b)=abab[1,1]

5.3 类比推理

python
def word_analogy(a, b, c, word_vectors, word_to_idx, idx_to_word):
    # a - b + c ≈ ?
    result_vec = word_vectors[a_idx] - word_vectors[b_idx] + word_vectors[c_idx]
    # 找与 result_vec 最相似的词

数学原理

vkingvman+vwomanvqueen

向量运算 vavb 捕捉了"从 a 到 b 的语义方向",加上 vc 就是将这个方向应用于 c。

注意:由于本 demo 语料库较小(25 篇文档),类比推理的效果有限。这个向量运算的特性需要在大规模语料(数十亿词)上才显著体现。代码展示了方法本身,在实际项目中可替换为大语料。

第6步:TF-IDF vs word2vec —— 全面对比

代码最后通过"文档级 TF-IDF 相似度"和"词级 word2vec 相似度"的对比,展示了两种方法的本质差异。

对比维度TF-IDFword2vec
表示粒度文档级(一篇文档 = 一个向量)词级(每个词 = 一个向量)
向量类型稀疏高维(V
语义能力仅统计频率通过上下文学习语义关系
词序完全忽略通过上下文窗口部分保留
相似度文档相似度(共享关键词)词语义相似度(共享上下文)
典型应用文档检索、关键词提取近义词查询、类比推理、下游模型初始化

关键概念速查表

概念公式代码对应
TFc(w,d)/c(w,d)compute_tf()
IDFlog(N/df(w))compute_idf()
TF-IDFTF×IDFtfidf_matrix[d,i]
Skip-gram中心词 → 上下文词build_skipgram_pairs()
负采样K 个随机词 vs 正样本SkipGramDataset
负采样损失logσ(vu+)logσ(vu)SkipGramNegSampling.forward()
噪声分布Pn(w)freq(w)0.75word_freq_pow
余弦相似度ab/|a||b|cosine_similarity()
词类比vavb+vcv?word_analogy()
分布式假设词的含义由上下文决定word2vec 的理论基础

完整代码

py
# -*- coding: utf-8 -*-
"""
s14 文本表示 demo:TF-IDF + word2vec Skip-gram
==============================================
本文件从零实现了两大文本表示方法:
  1. TF-IDF 向量化 — 统计词频 × 逆文档频率
  2. word2vec Skip-gram + 负采样 — 用神经网络学习稠密词向量

并通过 t-SNE 可视化、近义词查询、类比推理等实验,
直观对比两种方法的表达能力。

运行方式:在 s14_text_representation 目录下执行 `python code/demo.py`
依赖:numpy, torch, matplotlib, scikit-learn, scipy
"""

import numpy as np
import math
import random
from collections import Counter
from typing import List, Dict, Tuple, Set

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# GPU 自动检测
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu')
print(f"使用设备: {DEVICE}")
if DEVICE.type == 'cpu':
    print("(未检测到 GPU,使用 CPU 运行。如有 GPU,请安装 CUDA 版 PyTorch 以获得加速)")

import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams['axes.unicode_minus'] = False

import os
_HERE = os.path.dirname(os.path.abspath(__file__))
_IMAGES = os.path.join(_HERE, '..', 'images')
os.makedirs(_IMAGES, exist_ok=True)

# ============================================================
# 第一部分:中文语料库
# ============================================================

CORPUS = [
    # 体育类
    "足球比赛在工人体育场举行 观众热情高涨",
    "篮球运动员在训练中表现出色 投篮命中率很高",
    "游泳选手在奥运会中打破世界纪录 获得金牌",
    "乒乓球是中国最受欢迎的运动项目之一",
    "马拉松选手在雨中坚持跑到终点 令人感动",
    # 科技类
    "人工智能技术正在改变各行各业的运作方式",
    "机器学习模型在图像识别任务中超越了人类水平",
    "深度学习需要大量的数据和计算资源进行训练",
    "自然语言处理是人工智能的重要研究方向",
    "计算机视觉技术可以帮助自动驾驶汽车识别路况",
    # 教育类
    "大学教授正在讲授机器学习的基础理论知识",
    "学生在图书馆认真复习准备期末考试",
    "教育部发布了新的课程改革方案",
    "中小学生的课外辅导负担需要进一步减轻",
    "在线教育平台为偏远地区学生提供了优质课程",
    # 经济类
    "股票市场今天大幅上涨 投资者信心回升",
    "央行降低了基准利率以刺激经济增长",
    "房地产市场调控政策持续发力 房价趋于稳定",
    "国际贸易摩擦对全球经济造成了不确定性",
    "企业数字化转型成为经济高质量发展的关键",
    # 医疗类
    "医生建议人们定期体检以预防疾病",
    "新型疫苗的研发为疫情防控带来了希望",
    "心理健康问题越来越受到社会的关注",
    "中药在现代医学中的应用研究取得进展",
    "医院引进了先进的医疗设备提升诊疗水平",
]


def tokenize(text: str) -> List[str]:
    """
    对中文文本进行简单分词(按字符切分,实际项目中建议使用 jieba 等分词工具)。

    参数:
        text: 输入的中文文本字符串
    返回:
        分词后的词列表
    """
    # 去掉空格并按字符切分(简化的中文分词)
    text = text.replace(" ", "")
    return list(text)


# 对语料库进行分词
tokenized_corpus = [tokenize(doc) for doc in CORPUS]
# 构建词汇表
all_words = [word for doc in tokenized_corpus for word in doc]
vocab = sorted(set(all_words))
word_to_idx = {w: i for i, w in enumerate(vocab)}  # 词 → 索引映射
idx_to_word = {i: w for i, w in enumerate(vocab)}  # 索引 → 词映射
V = len(vocab)  # 词汇表大小

print(f"[语料统计] 文档数: {len(CORPUS)}, 词汇表大小: {V}")
print(f"[语料统计] 总词数: {len(all_words)}")
print()

# ============================================================
# 第二部分:TF-IDF 从零实现
# ============================================================

def compute_tf(doc_tokens: List[str]) -> Dict[str, float]:
    """
    计算单篇文档中每个词的词频 TF(w, d) = count(w, d) / total_words_in_doc

    参数:
        doc_tokens: 一篇文档的分词列表
    返回:
        dict: {词: TF值}
    """
    counter = Counter(doc_tokens)
    total = len(doc_tokens)
    if total == 0:
        return {}
    return {word: count / total for word, count in counter.items()}


def compute_idf(tokenized_docs: List[List[str]]) -> Dict[str, float]:
    """
    计算逆文档频率 IDF(w) = log(N / df(w)),其中 N 是总文档数,df(w) 是包含 w 的文档数。

    参数:
        tokenized_docs: 所有文档的分词列表
    返回:
        dict: {词: IDF值}
    """
    N = len(tokenized_docs)
    idf = {}
    for doc in tokenized_docs:
        # 文档内去重——每个词只算一次(df 是包含该词的文档数)
        for word in set(doc):
            idf[word] = idf.get(word, 0) + 1
    # IDF = log(N / df),加 1 平滑防止分母为 0
    for word in idf:
        idf[word] = math.log((N + 1) / (idf[word] + 1)) + 1
    return idf


# 计算全语料库的 IDF
idf_scores = compute_idf(tokenized_corpus)

# 构建 TF-IDF 矩阵:形状 (N_docs, V)
tfidf_matrix = np.zeros((len(CORPUS), V))
for doc_idx, doc_tokens in enumerate(tokenized_corpus):
    tf = compute_tf(doc_tokens)
    for word, tf_val in tf.items():
        if word in word_to_idx:
            word_idx = word_to_idx[word]
            tfidf_matrix[doc_idx, word_idx] = tf_val * idf_scores.get(word, 0)

print("=" * 60)
print("[TF-IDF Demo] 每篇文档的 Top-3 关键词:")
print("=" * 60)
for doc_idx, doc in enumerate(CORPUS):
    # 获取当前文档的 TF-IDF 向量
    doc_vec = tfidf_matrix[doc_idx]
    # 取出得分最高的 3 个词的索引
    top_indices = np.argsort(doc_vec)[::-1][:3]
    top_words = [f"{idx_to_word[i]}({doc_vec[i]:.3f})" for i in top_indices if doc_vec[i] > 0]
    print(f"  文档{doc_idx+1}: {doc[:25]}...")
    print(f"      关键词: {', '.join(top_words)}")
print()

# 可视化:TF-IDF 热力图(部分词汇)
print("[TF-IDF 可视化] 正在绘制热力图...")
fig, ax = plt.subplots(figsize=(16, 6))
# 选择 TF-IDF 总分最高的 30 个词来展示
word_totals = tfidf_matrix.sum(axis=0)
top_word_indices = np.argsort(word_totals)[::-1][:30]
top_words_viz = [idx_to_word[i] for i in top_word_indices]
top_tfidf = tfidf_matrix[:, top_word_indices]
im = ax.imshow(top_tfidf.T, aspect='auto', cmap='YlOrRd')
ax.set_xticks(range(len(CORPUS)))
ax.set_xticklabels([f"D{i+1}" for i in range(len(CORPUS))], rotation=45, fontsize=8)
ax.set_yticks(range(len(top_words_viz)))
ax.set_yticklabels(top_words_viz, fontsize=8)
ax.set_xlabel("Document ID", fontsize=12)
ax.set_ylabel("Keywords", fontsize=12)
ax.set_title("TF-IDF Heatmap: Documents x Keywords (Brighter = More Important)", fontsize=13, fontweight='bold')
plt.colorbar(im, ax=ax, shrink=0.8, label='TF-IDF Score')
plt.tight_layout()
plt.savefig(os.path.join(_IMAGES, 'tfidf_heatmap_demo.png'), dpi=150, bbox_inches='tight')
plt.close()
print("[TF-IDF 可视化] 热力图已保存至 images/tfidf_heatmap_demo.png")
print()

# ============================================================
# 第三部分:word2vec Skip-gram + 负采样
# ============================================================

# ---------- 3.1 构建训练数据 ----------

def build_skipgram_pairs(
    tokenized_docs: List[List[str]],
    window_size: int = 2,
) -> List[Tuple[int, int]]:
    """
    构建 Skip-gram 训练对:(中心词索引, 上下文词索引)

    参数:
        tokenized_docs: 所有文档的分词列表
        window_size: 上下文窗口大小(单侧)
    返回:
        (中心词, 上下文词) 索引对的列表
    """
    pairs = []
    for doc in tokenized_docs:
        indices = [word_to_idx[w] for w in doc if w in word_to_idx]
        for i, center in enumerate(indices):
            # 遍历窗口内的上下文词
            for j in range(max(0, i - window_size), min(len(indices), i + window_size + 1)):
                if i != j:  # 不包含中心词自身
                    pairs.append((center, indices[j]))
    return pairs


# 构建训练对
skipgram_pairs = build_skipgram_pairs(tokenized_corpus, window_size=2)
print(f"[Skip-gram] 共生成 {len(skipgram_pairs)} 个训练对")
print()

# 计算词频的 3/4 次方作为负采样分布(word2vec 论文推荐的噪声分布)
word_freq = Counter(all_words)
word_freq_pow = {w: count ** 0.75 for w, count in word_freq.items()}
total_pow = sum(word_freq_pow.values())
noise_dist = np.array([word_freq_pow.get(w, 0) / total_pow for w in vocab])


class SkipGramDataset(Dataset):
    """
    Skip-gram 训练数据集,每个样本为 (中心词, 上下文词, 负样本列表)
    """

    def __init__(self, pairs: List[Tuple[int, int]], num_neg: int = 5, noise_dist: np.ndarray = None):
        """
        参数:
            pairs: (中心词, 上下文词) 列表
            num_neg: 每个正样本配多少个负样本
            noise_dist: 负采样概率分布
        """
        self.pairs = pairs
        self.num_neg = num_neg
        self.noise_dist = noise_dist
        self.V = len(noise_dist) if noise_dist is not None else 0

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        center, pos_context = self.pairs[idx]
        # 负采样:从噪声分布中随机采样,排除正样本
        neg_samples = []
        while len(neg_samples) < self.num_neg:
            neg = np.random.choice(self.V, p=self.noise_dist)
            if neg != pos_context and neg != center:
                neg_samples.append(neg)
        return (
            torch.tensor(center, dtype=torch.long),
            torch.tensor(pos_context, dtype=torch.long),
            torch.tensor(neg_samples, dtype=torch.long),
        )


# ---------- 3.2 Skip-gram 模型 ----------

class SkipGramNegSampling(nn.Module):
    """
    Skip-gram 模型 with Negative Sampling.

    参数:
        vocab_size: 词汇表大小 V
        embed_dim: 词向量维度 d(通常 50~300)
    """

    def __init__(self, vocab_size: int, embed_dim: int = 100):
        super().__init__()
        # 输入嵌入矩阵 W (V × d) —— 这就是训练后我们要保留的词向量
        self.in_embeddings = nn.Embedding(vocab_size, embed_dim)
        # 输出嵌入矩阵 W' (V × d) —— 辅助矩阵,训练后可丢弃
        self.out_embeddings = nn.Embedding(vocab_size, embed_dim)
        # 参数初始化:小随机值
        self.in_embeddings.weight.data.uniform_(-0.5 / embed_dim, 0.5 / embed_dim)
        self.out_embeddings.weight.data.uniform_(-0.5 / embed_dim, 0.5 / embed_dim)

    def forward(self, center_words: torch.Tensor, context_words: torch.Tensor, neg_words: torch.Tensor):
        """
        前向计算:负采样损失。

        参数:
            center_words: 中心词索引,shape (batch,)
            context_words: 正样本上下文词索引,shape (batch,)
            neg_words: 负样本词索引,shape (batch, num_neg)
        返回:
            loss: 负采样损失(标量)
        """
        batch_size = center_words.size(0)
        # 查表获取向量
        v_center = self.in_embeddings(center_words)       # (batch, d)
        u_pos = self.out_embeddings(context_words)          # (batch, d)
        u_neg = self.out_embeddings(neg_words)              # (batch, num_neg, d)

        # 正样本得分:v_center · u_pos → sigmoid → log
        pos_score = torch.sum(v_center * u_pos, dim=1)     # (batch,)
        pos_loss = F.logsigmoid(pos_score).mean()           # -log σ(v·u_pos),取负是因为 log_sigmoid

        # 负样本得分:v_center · u_neg → sigmoid(-score) → log
        neg_score = torch.bmm(u_neg, v_center.unsqueeze(2)).squeeze(2)  # (batch, num_neg)
        neg_loss = F.logsigmoid(-neg_score).sum(dim=1).mean()           # -log σ(-v·u_neg)

        # 总损失 = -(正样本损失 + 负样本损失)
        return -(pos_loss + neg_loss)


# ---------- 3.3 训练 ----------

def train_skipgram(
    model: SkipGramNegSampling,
    dataloader: DataLoader,
    epochs: int = 50,
    lr: float = 0.01,
    device: torch.device = None,
):
    """
    训练 Skip-gram 模型。

    参数:
        model: SkipGramNegSampling 模型实例
        dataloader: 训练数据加载器
        epochs: 训练轮数
        lr: 学习率
        device: 计算设备
    返回:
        loss_history: 每个 epoch 的平均损失列表
    """
    if device is None:
        device = DEVICE
    model = model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    loss_history = []

    print(f"[Skip-gram 训练] 设备: {device}, Epochs: {epochs}, LR: {lr}")
    for epoch in range(epochs):
        total_loss = 0.0
        for batch_idx, (center, pos, neg) in enumerate(dataloader):
            center = center.to(device)
            pos = pos.to(device)
            neg = neg.to(device)
            optimizer.zero_grad()
            loss = model(center, pos, neg)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss / max(len(dataloader), 1)
        loss_history.append(avg_loss)
        if (epoch + 1) % 10 == 0:
            print(f"  Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")
    return loss_history


# 准备训练数据
dataset = SkipGramDataset(skipgram_pairs, num_neg=5, noise_dist=noise_dist)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

# 创建并训练模型
embed_dim = 64  # 词向量维度
model = SkipGramNegSampling(V, embed_dim=embed_dim)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
loss_history = train_skipgram(model, dataloader, epochs=80, lr=0.005, device=device)

# 训练完成后,提取输入嵌入矩阵作为最终的词向量
word_vectors = model.in_embeddings.weight.data.cpu().numpy()  # (V, embed_dim)
print(f"[Skip-gram] 词向量矩阵形状: {word_vectors.shape}")
print()

# ---------- 3.4 训练损失曲线 ----------

plt.figure(figsize=(8, 4))
plt.plot(loss_history, color='#2196F3', linewidth=1.5)
plt.xlabel("Epoch", fontsize=12)
plt.ylabel("Loss", fontsize=12)
plt.title("Skip-gram Negative Sampling Training Loss Curve", fontsize=13, fontweight='bold')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join(_IMAGES, 'skipgram_loss_curve.png'), dpi=150, bbox_inches='tight')
plt.close()
print("[可视化] 训练损失曲线已保存至 images/skipgram_loss_curve.png")

# ============================================================
# 第四部分:词向量分析
# ============================================================

# ---------- 4.1 t-SNE 降维可视化 ----------

from sklearn.manifold import TSNE

print("[t-SNE] 正在进行降维可视化...(可能需要几秒)")
# 选择出现频率最高的 100 个词进行可视化
word_counts = Counter(all_words)
top_words = [w for w, _ in word_counts.most_common(100) if w in word_to_idx]
top_indices = [word_to_idx[w] for w in top_words]
top_vectors = word_vectors[top_indices]

# t-SNE 降维到 2D
tsne = TSNE(n_components=2, random_state=42, perplexity=min(30, len(top_words) - 1), max_iter=500)
vectors_2d = tsne.fit_transform(top_vectors)

plt.figure(figsize=(16, 14))
plt.scatter(vectors_2d[:, 0], vectors_2d[:, 1], c='steelblue', alpha=0.6, s=50)
# 标注每个词的标签
for i, word in enumerate(top_words):
    plt.annotate(word, (vectors_2d[i, 0], vectors_2d[i, 1]),
                 fontsize=9, alpha=0.85,
                 bbox=dict(boxstyle='round,pad=0.2', facecolor='yellow', alpha=0.3))
plt.xlabel("t-SNE Dimension 1", fontsize=12)
plt.ylabel("t-SNE Dimension 2", fontsize=12)
plt.title("word2vec Embedding t-SNE Visualization (Top-100 Frequent Words)", fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig(os.path.join(_IMAGES, 'word2vec_tsne.png'), dpi=150, bbox_inches='tight')
plt.close()
print("[t-SNE] 可视化已保存至 images/word2vec_tsne.png")
print()

# ---------- 4.2 近义词查询 ----------

def cosine_similarity(v1: np.ndarray, v2: np.ndarray) -> float:
    """
    计算两个向量的余弦相似度。

    参数:
        v1, v2: 两个同维度向量
    返回:
        余弦相似度,范围 [-1, 1]
    """
    dot = np.dot(v1, v2)
    norm = np.linalg.norm(v1) * np.linalg.norm(v2)
    if norm == 0:
        return 0.0
    return dot / norm


def find_nearest_neighbors(
    query_word: str,
    word_vectors: np.ndarray,
    word_to_idx: Dict[str, int],
    idx_to_word: Dict[int, str],
    top_k: int = 10,
) -> List[Tuple[str, float]]:
    """
    查找与查询词最相似的 top_k 个词。

    参数:
        query_word: 查询词
        word_vectors: 词向量矩阵 (V, d)
        word_to_idx: 词到索引的映射
        idx_to_word: 索引到词的映射
        top_k: 返回最近邻数量
    返回:
        [(词, 余弦相似度), ...] 列表
    """
    if query_word not in word_to_idx:
        return []
    query_idx = word_to_idx[query_word]
    query_vec = word_vectors[query_idx]
    # 计算与所有词的余弦相似度
    similarities = []
    for i in range(len(word_vectors)):
        if i != query_idx:
            sim = cosine_similarity(query_vec, word_vectors[i])
            similarities.append((idx_to_word[i], sim))
    # 按相似度降序排列
    similarities.sort(key=lambda x: x[1], reverse=True)
    return similarities[:top_k]


# 演示:查询若干个词的近邻
query_words = ["学", "球", "医", "教", "机", "经"]
print("=" * 60)
print("[近义词查询] word2vec 余弦相似度 Top-5")
print("=" * 60)
for qw in query_words:
    if qw in word_to_idx:
        neighbors = find_nearest_neighbors(qw, word_vectors, word_to_idx, idx_to_word, top_k=5)
        print(f"  「{qw}」的近邻: {', '.join([f'{w}({s:.3f})' for w, s in neighbors])}")
    else:
        print(f"  「{qw}」不在词汇表中")
print()

# ---------- 4.3 类比推理 ----------

def word_analogy(
    a: str, b: str, c: str,
    word_vectors: np.ndarray,
    word_to_idx: Dict[str, int],
    idx_to_word: Dict[int, str],
    top_k: int = 5,
) -> List[Tuple[str, float]]:
    """
    词类比推理:a - b + c ≈ ?  (如 国王 - 男人 + 女人 ≈ 女王)

    参数:
        a, b, c: 三个类比词
        word_vectors: 词向量矩阵
        word_to_idx, idx_to_word: 词-索引映射
        top_k: 返回 top-k 结果
    返回:
        [(词, 余弦相似度), ...] 列表
    """
    if a not in word_to_idx or b not in word_to_idx or c not in word_to_idx:
        return []
    # 计算类比向量
    result_vec = word_vectors[word_to_idx[a]] - word_vectors[word_to_idx[b]] + word_vectors[word_to_idx[c]]
    # 排除 a, b, c 本身
    exclude = {word_to_idx[a], word_to_idx[b], word_to_idx[c]}
    # 计算所有词的余弦相似度
    similarities = []
    for i in range(len(word_vectors)):
        if i not in exclude:
            sim = cosine_similarity(result_vec, word_vectors[i])
            similarities.append((idx_to_word[i], sim))
    similarities.sort(key=lambda x: x[1], reverse=True)
    return similarities[:top_k]


# 注:由于语料库较小,类比推理效果有限。这里演示方法。
# 在大规模语料上,king-man+woman≈queen 这种效果才会显著。
print("=" * 60)
print("[类比推理演示] word2vec 向量运算")
print("=" * 60)

# 尝试几个类比:由于语料小,结果可能不好,但展示方法
analogy_triples = [
    ("足", "篮", "游"),   # 足球 - 篮球 + 游泳 ≈ ?
]
for a, b, c in analogy_triples:
    results = word_analogy(a, b, c, word_vectors, word_to_idx, idx_to_word, top_k=5)
    if results:
        print(f"  {a} - {b} + {c} ≈  ?")
        for word, sim in results:
            print(f"    → {word} (相似度: {sim:.4f})")
print()

# ---------- 4.4 TF-IDF vs word2vec 相似度对比 ----------

print("=" * 60)
print("[对比] TF-IDF vs word2vec 词相似度")
print("=" * 60)

# 用 TF-IDF 计算"文档"间的相似度(不是词之间的)
# word2vec 可以计算词之间的相似度
# 这展示了两种方法的本质区别

# TF-IDF 文档相似度
def tfidf_cosine_similarity(doc1_idx: int, doc2_idx: int, tfidf_matrix: np.ndarray) -> float:
    """计算两篇文档的 TF-IDF 向量余弦相似度"""
    v1 = tfidf_matrix[doc1_idx]
    v2 = tfidf_matrix[doc2_idx]
    dot = np.dot(v1, v2)
    norm = np.linalg.norm(v1) * np.linalg.norm(v2)
    if norm == 0:
        return 0.0
    return dot / norm


# 显示部分文档之间的 TF-IDF 相似度
print("\nTF-IDF 文档相似度矩阵 (部分):")
for i in range(0, 10, 2):
    for j in range(i + 1, min(i + 3, len(CORPUS))):
        sim = tfidf_cosine_similarity(i, j, tfidf_matrix)
        print(f"  Doc{i+1} vs Doc{j+1}: {sim:.4f}  |  {CORPUS[i][:20]}... <-> {CORPUS[j][:20]}...")

print("\nword2vec 词级别相似度(可以计算任意两个词的相似度):")
# 对比"足"与"篮"(体育类应相似)vs "足"与"医"(不相关)
for w1, w2 in [("足", "篮"), ("学", "习"), ("足", "医"), ("机", "器")]:
    if w1 in word_to_idx and w2 in word_to_idx:
        sim = cosine_similarity(word_vectors[word_to_idx[w1]], word_vectors[word_to_idx[w2]])
        print(f"  sim('{w1}', '{w2}') = {sim:.4f}")

print()
print("=" * 60)
print("[核心对比总结]")
print("=" * 60)
print("  TF-IDF:   基于统计计数的稀疏表示,忽略词序,适合文档级任务")
print("  word2vec: 基于上下文的稠密表示,捕获语义,适合词级任务")
print("  TF-IDF 给出「文档向量」用于文档检索/分类")
print("  word2vec 给出「词向量」用于近义词查询/类比推理")
print()
print("所有 demo 运行完成!图表已保存至 images/ 目录。")