ml08 聚类 — demo.py 代码详解
运行方式
cd ml08_clustering/code
python demo.py代码逐段详解
第1步:K-Means 从零实现——Lloyd 算法
K-Means 的核心目标是最小化簇内平方误差(WCSS):
class KMeans:
def __init__(self, n_clusters=3, init='kmeans++', max_iter=300, tol=1e-4, random_state=None):n_clusters:簇的数量 K——K-Means 必须预先指定的唯一超参数init='kmeans++':使用 K-Means++ 算法初始化质心,避免陷入糟糕的局部最优max_iter=300:最大迭代次数(Lloyd 算法通常很快收敛,300 很充裕)tol=1e-4:收敛容忍度——质心移动距离小于 tol 时提前停止
K-Means++ 初始化
def _init_centroids(self, X):
centroids[0] = X[rng.randint(n_samples)] # 步骤1: 随机选第一个质心
for k in range(1, self.n_clusters):
dists = cdist(X, centroids[:k]) ** 2 # 到最近质心的距离平方
min_dists = dists.min(axis=1)
probs = min_dists / min_dists.sum() # 归一化为采样概率
centroids[k] = X[rng.choice(n_samples, p=probs)] # 按概率选下一个K-Means++ 的关键洞察:距离越远的点,越应该成为新质心。第 3-4 步骤通过概率采样实现这一点——
数学上,K-Means++ 保证了
Lloyd 迭代
def fit(self, X):
self.centroids_ = self._init_centroids(X)
for iteration in range(self.max_iter):
# 分配步: label[i] = argmin_k ||x_i - mu_k||
distances = cdist(X, self.centroids_) # (n, K)
labels = np.argmin(distances, axis=1) # 每行找到最小距离的列索引
# 更新步: mu_k = mean(x_i in C_k)
new_centroids = np.zeros_like(self.centroids_)
for k in range(self.n_clusters):
mask = (labels == k)
if mask.sum() > 0:
new_centroids[k] = X[mask].mean(axis=0)Lloyd 算法的两个步骤:
- 分配步(E 步):固定质心,将每个样本分配给最近的质心。这等价于在给定参数下推断隐变量(簇归属)。
- 更新步(M 步):固定分配,将每个质心移到其簇所有样本的均值。这等价于在给定隐变量下最大化似然。
第2步:DBSCAN 从零实现
DBSCAN 用完全不同的哲学定义簇:簇 = 密度相连的点的最大集合。
class DBSCAN:
def __init__(self, eps=0.5, min_samples=5):
self.eps = eps # 邻域半径 epsilon
self.min_samples = min_samples # 核心点的最小邻居数两个参数的含义:
eps():定义"附近"的距离阈值 min_samples(minPts):定义"密集"的计数阈值
一个点是核心点当且仅当其
区域查询
def _region_query(self, X, point_idx):
dists = np.linalg.norm(X - X[point_idx], axis=1) # 欧氏距离
return np.where(dists <= self.eps)[0] # <= eps 的邻居np.linalg.norm(X - X[point_idx], axis=1) 利用了 NumPy 的广播机制:X 的每一行减去 X[point_idx],再沿列方向计算 L2 范数,得到每个点到查询点的距离向量。
簇的扩展(类似 BFS)
seeds = list(neighbors)
while j < len(seeds):
current = seeds[j]
j += 1
# 如果是噪声点,重新标记为边界点
if labels[current] == -1:
labels[current] = cluster_id - 1
# 如果是核心点,将其邻居加入种子集
if len(current_neighbors) >= self.min_samples:
for nb in current_neighbors:
if nb not in seeds:
seeds.append(nb)这是 DBSCAN 最核心的逻辑——区域扩张:
- 从一个核心点开始,将其
-邻域内的所有点加入"种子集" - 逐个处理种子集中的点:如果某个种子点也是核心点,则将其邻域也加入种子集
- 直到种子集被耗尽——此时该簇中的所有点都已找到
这个过程类似于广度优先搜索(BFS)或洪水填充(flood fill)。
第3步:聚类结果对比可视化
def plot_clustering_comparison(X_list, titles, figsize=(18, 5)):在三个合成数据集上对比 K-Means 和 DBSCAN:
- Blobs(各向同性高斯团):K-Means 的理想场景,两种算法都应表现良好
- Moons(月牙形):K-Means 失败——用直线强行切分非凸形状
- Circles(同心圆):K-Means 完全失败——用扇形切分嵌套结构,DBSCAN 完美分离
关键洞察:K-Means 和 DBSCAN 的差异根源在于它们对"什么是簇"的哲学不同——K-Means 认为簇是"球形区域",DBSCAN 认为簇是"密度相连的点集"。
第4步:轮廓系数分析
def plot_silhouette_analysis(X, K_range=range(2, 7)):
sample_sil_values = silhouette_samples(X, labels)轮廓系数 silhouette_samples 返回每个样本的轮廓系数,silhouette_score 返回平均值。
在轮廓图中需要关注:
- 红色虚线:平均轮廓系数——越高越好
- 每个簇的"柱子"宽度:反映了簇的大小
- 柱子伸出虚线的情况:如果某簇的大部分样本在虚线左侧,说明聚类质量差
第5步:DBSCAN 参数影响分析
def plot_dbscan_parameter_effect(X, param_list):展示不同 (eps, min_samples) 组合对 DBSCAN 结果的影响:
- eps 太小:几乎所有点都是噪声(找不到足够的邻居)
- eps 太大:所有点合并为一个簇(丧失了区分能力)
- minPts 太小:过多的核心点,可能把噪声也聚类
- minPts 太大:过少的核心点,很多点被标记为噪声
参数选择的经验法则:
( 为数据维度),eps 通过 k-距离图的"拐点"确定。
第6步:肘部法则
def plot_inertia_vs_k(X, K_range=range(1, 11)):肘部法则是选择 K-Means 最佳 K 的经典方法。原理:
- 随着 K 增大,inertia(WCSS)必然单调递减(每个点离其质心越来越近)
- 但"真正"的簇数之后,递减速率会显著变缓——在曲线上形成一个"肘部"
- 选择肘部对应的 K 作为最佳簇数
数学直觉:inertia 曲线类似于 PCA 的累计解释方差曲线——增加的簇在"真实簇数"之后只能解释噪声方差,因此边际收益骤降。
第7步:聚类评估指标汇总
K-Means 在 4 个真实簇的数据上,不同 K 值下的指标:
- K=2:轮廓系数较低(欠聚类——合并了应该分开的簇)
- K=4:轮廓系数最高(正确——数据真实有 4 个簇)
- K=6:轮廓系数下降(过聚类——拆分了一个自然簇)
关键概念速查表
| 概念 | 数学形式 | 代码位置 | 关键说明 |
|---|---|---|---|
| WCSS | KMeans.fit() | K-Means 的最小化目标 | |
| Lloyd 迭代 | 分配步 + 更新步 | KMeans.fit() for loop | 交替优化,保证单调收敛 |
| K-Means++ | _init_centroids() | 概率采样选分散质心 | |
_region_query() | DBSCAN 的密度定义基础 | ||
| 核心点 | fit() 中判断 | 密集区域内部点 | |
| 密度可达链 | 核心点 → 核心点 → ... → 任意点 | seeds 循环 | 簇的连通性定义 |
| 轮廓系数 | silhouette_score() | 聚类质量内部评估 | |
| 肘部法则 | inertia vs K 曲线 | plot_inertia_vs_k() | 选择最佳 K 值 |
完整代码
# -*- coding: utf-8 -*-
"""
ml08 聚类 — 演示代码
====================
功能:从零实现 K-Means 和 DBSCAN,在合成数据集(blobs, moons, circles)上对比三种聚类算法,
并进行轮廓系数分析确定最佳 K 值。
每个函数都有中文 docstring,每行逻辑代码都有中文注释。
运行方式:在 ml08_clustering/ 目录下执行 python code/demo.py
"""
import os
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams['axes.unicode_minus'] = False
from sklearn.datasets import make_blobs, make_moons, make_circles
from sklearn.metrics import silhouette_score, silhouette_samples
from sklearn.cluster import DBSCAN as SklearnDBSCAN
from scipy.spatial.distance import cdist
_HERE = os.path.dirname(os.path.abspath(__file__))
_IMAGES_DIR = os.path.join(_HERE, '..', 'images')
os.makedirs(_IMAGES_DIR, exist_ok=True)
# ============================================================================
# 第一部分:K-Means 从零实现
# ============================================================================
class KMeans:
"""
K-Means 聚类算法(Lloyd 算法 + K-Means++ 初始化)。
数学目标:最小化簇内平方误差
J = sum_{k=1}^{K} sum_{x_i in C_k} ||x_i - mu_k||^2
参数:
n_clusters: 簇的数量 K
init: 初始化方式,"random" 或 "kmeans++"
max_iter: 最大迭代次数
tol: 收敛容忍度(质心移动距离小于 tol 时提前停止)
random_state: 随机种子
"""
def __init__(self, n_clusters=3, init='kmeans++', max_iter=300, tol=1e-4, random_state=None):
self.n_clusters = n_clusters
self.init = init
self.max_iter = max_iter
self.tol = tol
self.random_state = random_state
self.centroids_ = None # 质心,shape (K, d)
self.labels_ = None # 每个样本的簇标签
self.inertia_ = None # 最终的目标函数值 J
self.n_iter_ = 0 # 实际迭代次数
def _init_centroids(self, X):
"""K-Means++ 或随机初始化质心"""
rng = np.random.RandomState(self.random_state)
n_samples = X.shape[0]
if self.init == 'random':
# 随机选择 K 个数据点作为初始质心
indices = rng.choice(n_samples, self.n_clusters, replace=False)
return X[indices].copy()
# K-Means++ 初始化
centroids = np.zeros((self.n_clusters, X.shape[1]))
# 步骤 1:均匀随机选第一个质心
centroids[0] = X[rng.randint(n_samples)]
for k in range(1, self.n_clusters):
# 计算每个点到最近已选质心的距离平方 D(x)^2
dists = cdist(X, centroids[:k]) ** 2 # (n_samples, k)
min_dists = dists.min(axis=1) # 每行的最小值
# 步骤 2-3:以 D(x)^2 为权重进行概率采样
probs = min_dists / min_dists.sum() # 归一化为概率
centroids[k] = X[rng.choice(n_samples, p=probs)]
return centroids
def fit(self, X):
"""执行 K-Means 聚类"""
self.centroids_ = self._init_centroids(X)
for iteration in range(self.max_iter):
# 分配步:计算每个点到所有质心的距离,分配到最近的质心
distances = cdist(X, self.centroids_) # (n, K)
labels = np.argmin(distances, axis=1) # (n,)
# 更新步:计算每个簇的新质心 = 簇内点的均值
new_centroids = np.zeros_like(self.centroids_)
for k in range(self.n_clusters):
mask = (labels == k)
if mask.sum() > 0:
new_centroids[k] = X[mask].mean(axis=0) # mu_k = mean(x_i in C_k)
else:
# 空簇处理:随机选一个点作为新质心
new_centroids[k] = X[np.random.randint(X.shape[0])]
# 检查收敛:质心移动距离 < tol?
centroid_shift = np.linalg.norm(new_centroids - self.centroids_, axis=1).max()
self.centroids_ = new_centroids
if centroid_shift < self.tol:
break
self.labels_ = labels
self.n_iter_ = iteration + 1
# 计算 inertia(簇内平方误差)
self.inertia_ = sum(
((X[self.labels_ == k] - self.centroids_[k]) ** 2).sum()
for k in range(self.n_clusters)
)
return self
# ============================================================================
# 第二部分:DBSCAN 从零实现
# ============================================================================
class DBSCAN:
"""
DBSCAN 密度聚类算法。
核心思想:簇是密度相连的点的最大集合。
参数:
eps: 邻域半径 ε
min_samples: 最小邻居数 minPts
metric: 距离度量,默认欧氏距离
"""
def __init__(self, eps=0.5, min_samples=5):
self.eps = eps
self.min_samples = min_samples
self.labels_ = None # 标签:-1 表示噪声点
self.core_sample_indices_ = None # 核心点的索引
def _region_query(self, X, point_idx):
"""查找点 point_idx 的 ε-邻域内所有点的索引"""
# 计算所有点到 point_idx 的距离
dists = np.linalg.norm(X - X[point_idx], axis=1) # (n,)
return np.where(dists <= self.eps)[0]
def fit(self, X):
"""执行 DBSCAN 聚类"""
n_samples = X.shape[0]
self.labels_ = np.full(n_samples, -1, dtype=int) # 初始全为噪声
cluster_id = 0 # 当前簇编号
visited = np.zeros(n_samples, dtype=bool) # 是否已访问
core_mask = np.zeros(n_samples, dtype=bool)
for i in range(n_samples):
if visited[i]:
continue
visited[i] = True
# 查询 ε-邻域
neighbors = self._region_query(X, i)
if len(neighbors) < self.min_samples:
# 暂时标记为噪声(后续可能被其他核心点"收编"为边界点)
continue
# 核心点:扩展新簇
core_mask[i] = True
cluster_id += 1
self.labels_[i] = cluster_id - 1
# 种子集:用于区域扩展(类似 BFS)
seeds = list(neighbors)
seeds.remove(i)
# 扩张簇:遍历种子集中的每个点
j = 0
while j < len(seeds):
current = seeds[j]
j += 1
if visited[current]:
# 已被访问过:如果是噪声,重新标记为边界点
if self.labels_[current] == -1:
self.labels_[current] = cluster_id - 1
continue
visited[current] = True
self.labels_[current] = cluster_id - 1
# 查询当前点的邻域
current_neighbors = self._region_query(X, current)
if len(current_neighbors) >= self.min_samples:
# 当前点也是核心点,将其邻域新点加入种子集
core_mask[current] = True
for nb in current_neighbors:
if nb not in seeds:
seeds.append(nb)
self.core_sample_indices_ = np.where(core_mask)[0]
return self
# ============================================================================
# 第三部分:可视化
# ============================================================================
def plot_clustering_comparison(X_list, titles, figsize=(18, 5)):
"""
对比 K-Means、DBSCAN 在三种合成数据集上的聚类结果。
参数:
X_list: [(X1, y1), (X2, y2), (X3, y3)] 数据集列表
titles: 每个数据集的名称列表
"""
algorithms = {
'K-Means (K=2)': lambda X: KMeans(n_clusters=2, random_state=42).fit(X).labels_,
'DBSCAN (eps=0.3)': lambda X: DBSCAN(eps=0.3, min_samples=5).fit(X).labels_,
}
n_datasets = len(X_list)
n_algos = len(algorithms)
fig, axes = plt.subplots(n_algos, n_datasets, figsize=figsize)
for col, (X, y_true, title) in enumerate(X_list):
for row, (algo_name, algo_fn) in enumerate(algorithms.items()):
ax = axes[row, col]
labels = algo_fn(X)
scatter = ax.scatter(X[:, 0], X[:, 1], c=labels, cmap='viridis',
s=20, alpha=0.8, edgecolors='k', linewidth=0.3)
ax.set_title(f'{algo_name}\n{title}', fontsize=11)
ax.set_xlabel('x1')
ax.set_ylabel('x2')
ax.set_xticks([])
ax.set_yticks([])
plt.tight_layout()
path = os.path.join(_IMAGES_DIR, 'ml08-05-clustering-comparison.png')
fig.savefig(path, dpi=150, bbox_inches='tight')
plt.close()
print(f'[保存] {path}')
def plot_silhouette_analysis(X, K_range=range(2, 7)):
"""
轮廓系数分析:画出不同 K 值下的轮廓图,帮助选择最佳 K。
参数:
X: 输入数据,shape (n, d)
K_range: 要测试的 K 值范围
"""
n_plots = len(K_range)
fig, axes = plt.subplots(2, n_plots, figsize=(4 * n_plots, 8))
for idx, k in enumerate(K_range):
# 运行 K-Means
km = KMeans(n_clusters=k, random_state=42).fit(X)
labels = km.labels_
silhouette_avg = silhouette_score(X, labels)
# 子图 1:轮廓图
ax1 = axes[0, idx]
sample_sil_values = silhouette_samples(X, labels)
y_lower = 10
for i in range(k):
ith_cluster_sil_values = sample_sil_values[labels == i]
ith_cluster_sil_values.sort()
size_cluster_i = ith_cluster_sil_values.shape[0]
y_upper = y_lower + size_cluster_i
color = plt.cm.viridis(i / k)
ax1.fill_betweenx(np.arange(y_lower, y_upper), 0, ith_cluster_sil_values,
facecolor=color, edgecolor=color, alpha=0.7)
ax1.text(-0.05, y_lower + 0.5 * size_cluster_i, str(i))
y_lower = y_upper + 10
ax1.axvline(x=silhouette_avg, color='red', linestyle='--', linewidth=1.5)
ax1.set_title(f'K={k}\nAvg={silhouette_avg:.3f}')
ax1.set_xlabel('Silhouette Coefficient')
ax1.set_ylabel('Cluster')
ax1.set_xlim([-0.1, 1.0])
# 子图 2:聚类结果散点图
ax2 = axes[1, idx]
colors = plt.cm.viridis(labels / k)
ax2.scatter(X[:, 0], X[:, 1], c=labels, cmap='viridis',
s=20, alpha=0.8, edgecolors='k', linewidth=0.3)
ax2.scatter(km.centroids_[:, 0], km.centroids_[:, 1],
marker='X', c='red', s=100, edgecolors='black', linewidth=1)
ax2.set_title(f'K={k} Clusters')
ax2.set_xlabel('x1')
ax2.set_ylabel('x2')
plt.tight_layout()
path = os.path.join(_IMAGES_DIR, 'ml08-06-silhouette-analysis.png')
fig.savefig(path, dpi=150, bbox_inches='tight')
plt.close()
print(f'[保存] {path}')
def plot_dbscan_parameter_effect(X, param_list, figsize=(18, 10)):
"""展示不同 (eps, min_samples) 参数对 DBSCAN 结果的影响"""
n_params = len(param_list)
cols = 3
rows = (n_params + cols - 1) // cols
fig, axes = plt.subplots(rows, cols, figsize=figsize)
axes = axes.flatten()
for idx, (eps, min_samples) in enumerate(param_list):
ax = axes[idx]
db = DBSCAN(eps=eps, min_samples=min_samples).fit(X)
labels = db.labels_
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
n_noise = list(labels).count(-1)
colors = plt.cm.tab10(labels % 10) if n_clusters > 0 else np.full((len(labels), 4), [0.5, 0.5, 0.5, 1.0])
ax.scatter(X[:, 0], X[:, 1], c=labels, cmap='tab10',
s=20, alpha=0.8, edgecolors='k', linewidth=0.3)
ax.set_title(f'eps={eps}, minPts={min_samples}\n'
f'Clusters: {n_clusters}, Noise: {n_noise}', fontsize=10)
ax.set_xlabel('x1')
ax.set_ylabel('x2')
ax.set_xticks([])
ax.set_yticks([])
# 隐藏多余的子图
for idx in range(n_params, len(axes)):
axes[idx].set_visible(False)
plt.tight_layout()
path = os.path.join(_IMAGES_DIR, 'ml08-07-dbscan-parameters.png')
fig.savefig(path, dpi=150, bbox_inches='tight')
plt.close()
print(f'[保存] {path}')
def plot_inertia_vs_k(X, K_range=range(1, 11)):
"""肘部法则:画出 inertia(WCSS)随 K 变化曲线"""
inertias = []
for k in K_range:
km = KMeans(n_clusters=k, random_state=42).fit(X)
inertias.append(km.inertia_)
fig, ax = plt.subplots(figsize=(8, 5))
ax.plot(list(K_range), inertias, 'bo-', markersize=8, linewidth=2)
ax.set_xlabel('Number of Clusters K')
ax.set_ylabel('Inertia (WCSS)')
ax.set_title('Elbow Method for Optimal K')
# 标出"肘部"位置
ax.axvline(x=4, color='red', linestyle='--', linewidth=1.5, alpha=0.7)
ax.annotate('Possible Elbow (K=4)', xy=(4, inertias[3]),
xytext=(5.5, inertias[3] * 1.3),
arrowprops=dict(arrowstyle='->', color='red'),
fontsize=11, color='red')
ax.grid(True, alpha=0.3)
plt.tight_layout()
path = os.path.join(_IMAGES_DIR, 'ml08-08-elbow-method.png')
fig.savefig(path, dpi=150, bbox_inches='tight')
plt.close()
print(f'[保存] {path}')
# ============================================================================
# 第四部分:主程序
# ============================================================================
def main():
print('=' * 60)
print('ml08 聚类 — 演示代码')
print('=' * 60)
# 1. 生成三个合成数据集
print('\n[1/6] 生成合成数据集...')
X_blobs, y_blobs = make_blobs(n_samples=300, centers=4, random_state=42,
cluster_std=0.8)
X_moons, y_moons = make_moons(n_samples=300, noise=0.08, random_state=42)
X_circles, y_circles = make_circles(n_samples=300, noise=0.05, factor=0.5,
random_state=42)
datasets = [
(X_blobs, y_blobs, 'Blobs\n(spherical)'),
(X_moons, y_moons, 'Moons\n(non-convex)'),
(X_circles, y_circles, 'Circles\n(nested)'),
]
# 2. 对比 K-Means vs DBSCAN 在不同数据集上的效果
print('[2/6] 对比 K-Means vs DBSCAN...')
plot_clustering_comparison(datasets, [d[2] for d in datasets])
# 3. 轮廓系数分析(在 blobs 数据上演示)
print('[3/6] 轮廓系数分析...')
plot_silhouette_analysis(X_blobs, K_range=range(2, 7))
# 4. DBSCAN 参数影响
print('[4/6] DBSCAN 参数选择分析...')
param_list = [
(0.15, 3), (0.15, 5), (0.15, 10),
(0.30, 3), (0.30, 5), (0.30, 10),
]
plot_dbscan_parameter_effect(X_circles, param_list)
# 5. 肘部法则
print('[5/6] 肘部法则...')
plot_inertia_vs_k(X_blobs)
# 6. 打印核心指标
print('\n[6/6] 聚类评估指标汇总:')
print('-' * 50)
X = X_blobs # 以 blobs 数据为例
for k in range(2, 8):
km = KMeans(n_clusters=k, random_state=42).fit(X)
labels = km.labels_
sil = silhouette_score(X, labels)
print(f' K={k}: 轮廓系数={sil:.4f}, Inertia={km.inertia_:.2f}, '
f'迭代次数={km.n_iter_}')
if k == 4:
print(f' >>> 数据真实有 4 个簇,轮廓系数分析应在 K=4 附近表现最好')
print('\n完成!所有图表已保存到 images/ 目录。')
if __name__ == '__main__':
main()