Skip to content

ml08 聚类 — exercise.py 练习指南

Download exercise.py

练习目标

通过实现 K-Means 单次迭代、轮廓系数计算、DBSCAN 区域查询以及综合 K 选择,从代码层面深入理解三种聚类算法的核心机制。

预备知识

在开始练习前,确保你已经理解了以下概念:

  • Lloyd 算法的两步交替过程:分配步(将每个点分配到最近质心)和更新步(将质心移到簇均值)
  • 欧氏距离平方在聚类中的核心地位:xμ2=(xdμd)2
  • 轮廓系数的数学定义:s(i)=b(i)a(i)max(a(i),b(i))
  • DBSCAN 的核心点判定逻辑:ε-邻域内至少有 minPts 个邻居
  • 肘部法则和轮廓系数分析如何帮助选择 K

任务清单

任务1:实现 K-Means 的单次迭代 kmeans_single_iteration(X, centroids)

  • 用到的公式
    • 分配:label[i]=argminkxiμk2
    • 更新:μk=1|Ck|xiCkxi
    • 惯性:J=k=1KxiCkxiμk2
  • 实现步骤
    1. cdist(X, centroids) 计算距离矩阵
    2. np.argmin(distances, axis=1) 得到标签
    3. 对每个 k,用 X[labels == k].mean(axis=0) 更新质心
    4. 计算惯性(每个点到其簇质心的距离平方和)
  • 需要调用的函数cdist(), np.argmin(), np.mean()
  • 返回(labels, new_centroids, inertia)
  • 期望输出:标签为 0/1/2 三类,新质心 3x2 矩阵,inertia 为正实数

任务2:实现轮廓系数计算 silhouette_score_manual(X, labels)

  • 算法流程
    1. 预计算距离矩阵 cdist(X, X)——shape (n, n)
    2. 对每个样本 i:
      • a(i):同簇其他样本的距离均值
      • b(i):最近的其他簇的距离均值(取 min)
      • s(i)=(ba)/max(a,b)
    3. 返回所有 s(i) 的均值
  • 实现提示
    • 同簇样本:mask = (labels == labels[i]),注意排除 i 自身
    • 其他簇平均距离:遍历所有 unique(labels),跳过 labels[i]
  • 需要调用的函数cdist(), np.unique(), np.mean()
  • 边界情况:如果只有一个簇或每个样本各成一簇,返回 0
  • 期望输出:范围 [1,1],对于分离良好的 blobs 数据应 (0.6,0.8)

任务3:实现 DBSCAN 区域查询 region_query(X, point_idx, eps)

  • 数学定义Nε(p)={q:pqε}
  • 实现步骤
    1. 计算 X[point_idx] 到 X 中所有点的距离
    2. 返回距离 ε 的点的索引
  • 需要调用的函数np.linalg.norm(), np.where()
  • 验证:对 [[0,0], [0.3,0.3], [0.5,0.5], [2,2]]eps=0.5,查询点 0 应返回 [0, 1]

任务4:综合选择最佳 K find_optimal_k(X, K_range)

  • 实现策略
    1. 对 K_range 中的每个 K,运行 sklearn 的 KMeans
    2. 记录 inertia 和轮廓系数
    3. 找轮廓系数最高的 K 作为最佳选择
    4. 打印所有结果表格
  • 需要调用的函数sklearn.cluster.KMeans, sklearn.metrics.silhouette_score
  • 思考题:轮廓系数和肘部法则在某些情况下可能给出不同建议——为什么?应该更信任哪个?

完整代码

py
# -*- coding: utf-8 -*-
"""
ml08 聚类 — 练习代码
====================
请完成以下 TODO 任务,巩固对 K-Means、DBSCAN 和聚类评估的理解。

每个 TODO 都有详细的指示和预期输出描述。
建议先阅读 index.md 和 code-demo.md,再尝试独立补全代码。
"""

import numpy as np
from sklearn.datasets import make_blobs
from scipy.spatial.distance import cdist


# ============================================================================
# TODO 1: 实现 K-Means 的单次迭代
# ============================================================================
def kmeans_single_iteration(X, centroids):
    """
    执行 K-Means 的一轮迭代(分配 + 更新)。

    数学公式:
        分配步: label[i] = argmin_k ||X[i] - centroids[k]||^2
        更新步: centroids[k] = mean_{i: label[i]=k} X[i]

    参数:
        X: 数据矩阵,shape (n_samples, d)
        centroids: 当前质心,shape (K, d)

    返回:
        labels: 分配结果,shape (n_samples,)
        new_centroids: 更新后的质心,shape (K, d)
        inertia: 簇内平方误差 J = sum sum ||x - mu||^2
    """
    K = centroids.shape[0]

    # TODO: 步骤 1 —— 计算每个点到所有质心的欧氏距离平方
    # 提示:使用 cdist(X, centroids, metric='euclidean') 或手动计算
    distances = None  # ← TODO: shape (n_samples, K)

    # TODO: 步骤 2 —— 分配步:每个点归属到距离最近的质心
    labels = None  # ← TODO: np.argmin(distances, axis=1)

    # TODO: 步骤 3 —— 更新步:计算每个簇的新质心 = 簇内点的均值
    new_centroids = np.zeros_like(centroids)
    for k in range(K):
        mask = None  # ← TODO: (labels == k)
        if mask.sum() > 0:
            new_centroids[k] = None  # ← TODO: X[mask].mean(axis=0)
        else:
            new_centroids[k] = centroids[k]  # 空簇保持原质心

    # TODO: 步骤 4 —— 计算 inertia(簇内平方误差)
    inertia = None  # ← TODO: 对每个簇 k,计算 sum(||X[labels==k] - centroids[k]||^2) 然后求和

    return labels, new_centroids, inertia


# TODO 1 测试
def test_kmeans_iteration():
    """测试 K-Means 单次迭代"""
    print("=" * 60)
    print("TODO 1 测试: K-Means 单次迭代")
    print("=" * 60)
    X, _ = make_blobs(n_samples=100, centers=3, random_state=42)
    # 随机初始化 3 个质心
    centroids = X[np.random.choice(len(X), 3, replace=False)]
    labels, new_centroids, inertia = kmeans_single_iteration(X, centroids)

    if labels is None:
        print("  TODO 1 未完成,请补全 kmeans_single_iteration 函数")
    else:
        print(f"  标签数: {len(labels)} (期望: 100)")
        print(f"  标签类型: {np.unique(labels)} (期望: [0, 1, 2])")
        print(f"  新质心形状: {new_centroids.shape} (期望: (3, 2))")
        print(f"  Inertia: {inertia:.2f}")
        # 验证惯性非负
        assert inertia >= 0, "Inertia 必须非负"


# ============================================================================
# TODO 2: 实现轮廓系数计算
# ============================================================================
def silhouette_score_manual(X, labels):
    """
    手动计算平均轮廓系数。

    对每个样本 i:
        a(i) = 样本 i 到同簇其他样本的平均距离
        b(i) = 样本 i 到最近的其他簇所有样本的平均距离
        s(i) = (b(i) - a(i)) / max(a(i), b(i))

    参数:
        X: 数据矩阵,shape (n_samples, d)
        labels: 聚类标签,shape (n_samples,)

    返回:
        float: 平均轮廓系数
    """
    n_samples = X.shape[0]
    unique_labels = np.unique(labels)
    n_clusters = len(unique_labels)

    if n_clusters == 1 or n_clusters == n_samples:
        return 0.0  # 边缘情况

    # 预计算所有点对距离矩阵(上三角部分)
    dist_matrix = cdist(X, X)  # (n, n)

    silhouette_vals = np.zeros(n_samples)

    for i in range(n_samples):
        label_i = labels[i]

        # TODO: 计算 a(i) —— 样本 i 到同簇其他样本的平均距离
        # 提示:用 (labels == label_i) 筛选同簇样本,排除 i 自身
        same_cluster_mask = None  # ← TODO
        a_i = None  # ← TODO: dist_matrix[i, same_cluster_mask].mean()

        # TODO: 计算 b(i) —— 样本 i 到最近的其他簇的平均距离
        # 提示:对每个不等于 label_i 的簇 label_k,计算 i 到该簇所有样本的平均距离,取最小值
        b_i = float('inf')
        for label_k in unique_labels:
            if label_k == label_i:
                continue
            other_cluster_mask = None  # ← TODO: (labels == label_k)
            avg_dist = None  # ← TODO: dist_matrix[i, other_cluster_mask].mean()
            if avg_dist < b_i:
                b_i = avg_dist

        # TODO: 计算 s(i)
        silhouette_vals[i] = None  # ← TODO: (b_i - a_i) / max(a_i, b_i)

    return silhouette_vals.mean()


# TODO 2 测试
def test_silhouette():
    """测试轮廓系数计算"""
    print("\n" + "=" * 60)
    print("TODO 2 测试: 轮廓系数计算")
    print("=" * 60)
    X, y = make_blobs(n_samples=100, centers=3, random_state=42)
    score = silhouette_score_manual(X, y)

    if score is None:
        print("  TODO 2 未完成,请补全 silhouette_score_manual 函数")
    else:
        print(f"  手动轮廓系数: {score:.4f} (应接近 0.6-0.8)")
        assert -1.0 <= score <= 1.0, "轮廓系数必须在 [-1, 1] 范围内"


# ============================================================================
# TODO 3: 实现 DBSCAN 的区域查询
# ============================================================================
def region_query(X, point_idx, eps):
    """
    查找点 point_idx 的 ε-邻域(欧氏距离 ≤ eps 的所有点)。

    参数:
        X: 数据矩阵,shape (n_samples, d)
        point_idx: 查询点的索引
        eps: 邻域半径

    返回:
        neighbors: 邻域内所有点的索引数组
    """
    # TODO: 计算所有点到 X[point_idx] 的欧氏距离
    # 提示: np.linalg.norm(X - X[point_idx], axis=1) 得到距离向量
    dists = None  # ← TODO: shape (n,)

    # TODO: 返回距离 ≤ eps 的点的索引
    neighbors = None  # ← TODO: np.where(dists <= eps)[0]
    return neighbors


# TODO 3 测试
def test_region_query():
    """测试区域查询"""
    print("\n" + "=" * 60)
    print("TODO 3 测试: DBSCAN 区域查询")
    print("=" * 60)
    X = np.array([[0., 0.], [0.3, 0.3], [0.5, 0.5], [2., 2.]])
    neighbors = region_query(X, 0, eps=0.5)

    if neighbors is None:
        print("  TODO 3 未完成,请补全 region_query 函数")
    else:
        print(f"  点0的eps邻域: {neighbors} (期望: [0, 1])")
        print(f"  邻域大小: {len(neighbors)} (期望: 2)")


# ============================================================================
# TODO 4: 选择最佳 K(综合肘部法则和轮廓系数)
# ============================================================================
def find_optimal_k(X, K_range=range(2, 9)):
    """
    综合肘部法则和轮廓系数,找到最佳 K 值。

    策略:
        1. 对每个 K 运行 K-Means,记录 inertia 和平均轮廓系数
        2. 找轮廓系数最高的 K
        3. 用肘部法则辅助验证
    提示:你可以 import 上面实现的 kmeans_single_iteration
         或者用 sklearn.cluster.KMeans

    返回:
        best_k_silhouette: 轮廓系数最高的 K
        results: dict {K: {'inertia': ..., 'silhouette': ...}}
    """
    # 这里可以使用 sklearn(简化迭代循环)
    from sklearn.cluster import KMeans as SKMeans
    from sklearn.metrics import silhouette_score as sk_sil

    results = {}
    print("\n" + "=" * 60)
    print("TODO 4 测试: 选择最佳 K")
    print("=" * 60)

    # 生成测试数据
    X, _ = make_blobs(n_samples=300, centers=4, random_state=42)

    for k in K_range:
        # TODO: 运行 K-Means 并记录结果
        km = None  # ← TODO: SKMeans(n_clusters=k, random_state=42).fit(X)
        inertia = None  # ← TODO: km.inertia_
        labels = None   # ← TODO: km.labels_
        sil = None      # ← TODO: sk_sil(X, labels)

        results[k] = {'inertia': inertia, 'silhouette': sil}
        print(f'  K={k}: inertia={inertia:.2f}, silhouette={sil:.4f}')

    # TODO: 找到轮廓系数最高的 K
    best_k = None  # ← 基于 silhouette 选择
    print(f'\n  最佳 K (轮廓系数): {best_k}')

    return best_k, results


# ============================================================================
# 主程序
# ============================================================================

if __name__ == '__main__':
    print("ml08 聚类 — 练习代码")
    print("请完成所有 TODO 标记的代码")
    print()

    test_kmeans_iteration()
    test_silhouette()
    test_region_query()
    find_optimal_k(X=np.random.randn(100, 2))  # 将在函数内生成测试数据

    print("\n全部测试完成!")