Skip to content

ml06 集成学习:Bagging 与随机森林 — demo.py 代码详解

Download demo.py

运行方式

bash
cd ml06_random_forest/code
python demo.py

代码逐段详解

第1步:从 Bootstrap 到 Bagging

python
boot_idx = np.random.choice(n_samples, size=n_samples, replace=True)
X_boot, y_boot = X[boot_idx], y[boot_idx]

np.random.choice 的核心参数是 replace=True(有放回采样)。这意味着:

  • 每个样本每次被选中的概率都是 1/n
  • 一个样本可能被选中多次(在 X_boot 中出现多次)
  • 一个样本可能一次都不被选中(成为 OOB 样本)

数学上,某样本不被选中的概率:

P=(11n)ne10.368

这就是为什么 OOB 样本约占总体的 36.8%。

第2步:随机特征子空间

python
if self.max_features is None:
    features_to_use = list(range(n_features))
else:
    m = min(self.max_features, n_features)
    features_to_use = np.random.choice(n_features, size=m, replace=False)

这是随机森林区别于普通 Bagging 的核心机制。在决策树的每个分裂节点(不是每棵树!),只随机选择 m 个特征来评估:

  • m=d:Breiman 推荐的分类默认值
  • m=d/3:回归的推荐值
  • m=d(使用全部特征):退化为普通 Bagging

这种随机化迫使各树使用不同的特征进行分裂,从而降低树之间的相关性 ρ。根据集成方差公式 Varρσ2(当 B),ρ 越小,集成方差越低。

第3步:多数投票

python
def predict(self, X):
    all_preds = np.array([tree.predict(X) for tree in self.trees])
    predictions = scipy_stats.mode(all_preds, axis=0, keepdims=False)[0]
    return predictions.ravel()

scipy.stats.mode 对每个测试样本(axis=0)找出在 n_estimators 棵树中出现次数最多的类别。如果不使用 scipy,也可以用:

python
# 等价实现(手动投票)
predictions = [np.bincount(all_preds[:, i]).argmax() for i in range(n_samples)]

第4步:OOB 误差 — 免费的验证集

python
def oob_score(self, X, y):
    for i in range(n_samples):
        tree_indices = [t for t in range(self.n_estimators)
                        if i not in self.bootstrap_indices[t]]
        votes = [self.trees[t].predict(X[i:i+1])[0] for t in tree_indices]
        oob_predictions[i] = scipy_stats.mode(votes)[0]

OOB 的核心逻辑:

  1. 对于每个样本 i,检查它不在哪些树的 Bootstrap 抽样中
  2. 只用这些树(它们"没见过"样本 i)来预测样本 i
  3. 所有树对该样本的平均约 36.8% 未使用该样本

OOB 误差通常能很好地近似测试集误差——它和交叉验证一样是"用未见过模型的数据来评估",但不需要额外的计算开销(因为这些树本来就"免费地"未见过某些数据)。

第5步:三路对比 — 单树 vs Bagging vs 随机森林

plot_single_vs_rf_boundary() 函数并排对比了三种模型在同一数据上的决策边界:

  1. 单棵决策树max_depth=None):边界非常复杂,布满锯齿。训练精度高,测试精度低——过拟合。
  2. Baggingmax_features='all'):边界明显平滑,但仍有一些不规则的锯齿。方差有所降低但树之间仍有较高相关性。
  3. 随机森林max_features='sqrt'):边界最平滑,区域最规整。特征子空间随机化进一步降低了树间相关性。

关键概念速查表

概念公式/描述代码位置关键说明
Bootstrapnp.random.choice(n, n, replace=True)fit()有放回采样
OOB 概率(11/n)n36.8%oob_score()自然验证集
随机特征子空间np.random.choice(d, m, replace=False)_best_split()降低树间相关性
多数投票scipy.stats.modepredict()分类的聚合方式
集成方差ρσ2+(1ρ)σ2/B理论ρ 越低越好
MDI 重要性加权不纯度减少理论特征对纯度的贡献

完整代码

py
# -*- coding: utf-8 -*-
"""
===============================================================================
ml06_random_forest/code/demo.py — Bagging 与随机森林
===============================================================================
本演示从零实现 Bagging 和随机森林,基于 ml05 的 CART 决策树:
  1. Bootstrap 采样的实现和效果演示
  2. Bagging 分类器(多棵树 + 投票)
  3. 随机森林(Bagging + 随机特征子空间)
  4. OOB 误差估计
  5. 特征重要性(MDI)的计算与可视化
  6. 单决策树 vs Bagging vs 随机森林的全面对比
  7. 与 sklearn RandomForestClassifier 对比验证

通过本演示,你将理解:
  - Bootstrap 采样如何产生训练集的多样性
  - 随机特征子空间如何降低树间相关性
  - 为什么随机森林的泛化性能优于单棵决策树
  - OOB 误差如何作为免费的泛化能力估计

作者:learn-ai 项目
日期:2025
===============================================================================
"""

import os
import sys
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
from scipy import stats as scipy_stats
from sklearn.datasets import make_classification, make_moons
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier as SkRandomForest
from sklearn.metrics import accuracy_score
matplotlib.rcParams['axes.unicode_minus'] = False

_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
_IMAGES_DIR = os.path.join(_SCRIPT_DIR, '..', 'images')
os.makedirs(_IMAGES_DIR, exist_ok=True)

def _save_path(filename):
    return os.path.join(_IMAGES_DIR, filename)


# ============================================================================
# 第一部分:从 ml05 复用的 CART 决策树
# ============================================================================

def gini(y):
    _, counts = np.unique(y, return_counts=True)
    probs = counts / len(y)
    return 1.0 - np.sum(probs ** 2)


class DecisionTree:
    """CART 决策树(简化版,用于集成)。"""

    class Node:
        __slots__ = ('feature_idx', 'threshold', 'left', 'right', 'value', 'is_leaf')
        def __init__(self, value=None):
            self.feature_idx = None
            self.threshold = None
            self.left = None
            self.right = None
            self.value = value
            self.is_leaf = True

    def __init__(self, max_depth=None, max_features=None):
        self.max_depth = max_depth
        self.max_features = max_features  # None = use all features
        self.root = None

    def fit(self, X, y):
        X = np.asarray(X, dtype=np.float64)
        y = np.asarray(y, dtype=np.int64)
        self.n_features_ = X.shape[1]
        self.root = self._build_tree(X, y, depth=0)
        return self

    def _build_tree(self, X, y, depth):
        n_samples = len(y)

        if len(np.unique(y)) == 1:
            return self.Node(value=y[0])
        if self.max_depth is not None and depth >= self.max_depth:
            return self.Node(value=np.bincount(y).argmax())
        if n_samples < 2:
            return self.Node(value=np.bincount(y).argmax())

        best_feat, best_thresh, best_gain = self._best_split(X, y)
        if best_feat is None or best_gain <= 0:
            return self.Node(value=np.bincount(y).argmax())

        left_mask = X[:, best_feat] <= best_thresh
        right_mask = ~left_mask

        if np.sum(left_mask) == 0 or np.sum(right_mask) == 0:
            return self.Node(value=np.bincount(y).argmax())

        node = self.Node()
        node.is_leaf = False
        node.feature_idx = best_feat
        node.threshold = best_thresh
        node.left = self._build_tree(X[left_mask], y[left_mask], depth + 1)
        node.right = self._build_tree(X[right_mask], y[right_mask], depth + 1)
        return node

    def _best_split(self, X, y):
        n_samples, n_features = X.shape
        parent_impurity = gini(y)
        best_gain = -np.inf
        best_feat = None
        best_thresh = None

        # 随机特征子空间
        if self.max_features is None:
            features_to_use = list(range(n_features))
        else:
            m = min(self.max_features, n_features)
            features_to_use = np.random.choice(n_features, size=m, replace=False)

        for feat_idx in features_to_use:
            feature_values = X[:, feat_idx]
            sorted_indices = np.argsort(feature_values)
            sorted_X = feature_values[sorted_indices]
            sorted_y = y[sorted_indices]

            for i in range(n_samples - 1):
                if sorted_X[i] == sorted_X[i + 1]:
                    continue
                threshold = (sorted_X[i] + sorted_X[i + 1]) / 2.0
                y_left = sorted_y[:i + 1]
                y_right = sorted_y[i + 1:]
                n_left, n_right = len(y_left), len(y_right)
                child_impurity = (n_left/n_samples)*gini(y_left) + \
                                (n_right/n_samples)*gini(y_right)
                gain = parent_impurity - child_impurity
                if gain > best_gain:
                    best_gain = gain
                    best_feat = feat_idx
                    best_thresh = threshold

        return best_feat, best_thresh, best_gain

    def predict(self, X):
        X = np.asarray(X, dtype=np.float64)
        return np.array([self._traverse(x, self.root) for x in X])

    def _traverse(self, x, node):
        if node.is_leaf:
            return node.value
        if x[node.feature_idx] <= node.threshold:
            return self._traverse(x, node.left)
        else:
            return self._traverse(x, node.right)


# ============================================================================
# 第二部分:随机森林(Bagging + 随机特征子空间)
# ============================================================================

class RandomForest:
    """
    随机森林分类器。

    核心机制:
      1. Bootstrap 采样: 每棵树用不同的 Bootstrap 子集训练
      2. 随机特征子空间: 每个分裂节点只考虑 max_features 个随机特征
      3. 多数投票: 所有树的预测结果取众数

    参数:
        n_estimators: int, 树的数量
        max_depth: int or None, 每棵树的最大深度
        max_features: int or str, 每次分裂考虑的特征数
                      ('sqrt' = sqrt(n_features), 'all' = n_features)
    """

    def __init__(self, n_estimators=100, max_depth=None, max_features='sqrt'):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.max_features = max_features
        self.trees = []
        self.bootstrap_indices = []  # 记录每棵树的 Bootstrap 采样索引

    def fit(self, X, y):
        X = np.asarray(X, dtype=np.float64)
        y = np.asarray(y, dtype=np.int64)
        n_samples, n_features = X.shape

        # 确定每次分裂的特征数
        if self.max_features == 'sqrt':
            m = int(np.sqrt(n_features))
        elif self.max_features == 'all':
            m = n_features
        else:
            m = min(self.max_features, n_features)

        self.trees = []
        self.bootstrap_indices = []

        for i in range(self.n_estimators):
            # Bootstrap 采样 (有放回)
            boot_idx = np.random.choice(n_samples, size=n_samples, replace=True)
            X_boot, y_boot = X[boot_idx], y[boot_idx]

            # 训练单棵树
            tree = DecisionTree(max_depth=self.max_depth, max_features=m)
            tree.fit(X_boot, y_boot)

            self.trees.append(tree)
            self.bootstrap_indices.append(boot_idx)

        return self

    def predict(self, X):
        """多数投票。"""
        X = np.asarray(X, dtype=np.float64)
        # 收集所有树的预测 (n_estimators, n_samples)
        all_preds = np.array([tree.predict(X) for tree in self.trees])
        # 对每个样本,取出现次数最多的类别
        predictions = scipy_stats.mode(all_preds, axis=0, keepdims=False)[0]
        return predictions.ravel()

    def oob_score(self, X, y):
        """
        计算 OOB (Out-of-Bag) 准确率。

        对于每个样本 i,找到所有未使用样本 i 的树,用这些树的多数投票预测。
        """
        X = np.asarray(X, dtype=np.float64)
        y = np.asarray(y, dtype=np.int64)
        n_samples = len(y)

        oob_predictions = np.zeros(n_samples, dtype=np.int64)
        oob_covered = np.zeros(n_samples, dtype=bool)

        for i in range(n_samples):
            # 找到所有未使用样本 i 的树
            tree_indices = []
            for t_idx in range(self.n_estimators):
                if i not in self.bootstrap_indices[t_idx]:
                    tree_indices.append(t_idx)

            if len(tree_indices) == 0:
                continue

            oob_covered[i] = True
            # 只用这些树做预测
            votes = [self.trees[t].predict(X[i:i+1])[0] for t in tree_indices]
            oob_predictions[i] = scipy_stats.mode(votes, keepdims=False)[0]

        # 只对有 OOB 覆盖的样本计算准确率
        if np.sum(oob_covered) == 0:
            return 0.0
        return accuracy_score(y[oob_covered], oob_predictions[oob_covered])


# ============================================================================
# 第三部分:可视化函数
# ============================================================================

def plot_bootstrap_demo():
    """展示 Bootstrap 采样的效果。"""
    np.random.seed(42)
    n_samples = 100

    # 生成一些示例数据
    original = np.arange(n_samples)

    # 三次 Bootstrap 采样
    fig, axes = plt.subplots(1, 3, figsize=(15, 4.5))
    titles = ['Bootstrap Sample 1', 'Bootstrap Sample 2', 'Bootstrap Sample 3']

    for ax, title in zip(axes, titles):
        boot_idx = np.random.choice(n_samples, size=n_samples, replace=True)
        # 统计每个样本被选中的次数
        counts = np.bincount(boot_idx, minlength=n_samples)
        # 统计 OOB 样本
        oob_mask = counts == 0

        colors = np.where(counts > 0, 'steelblue', 'lightgray')
        ax.bar(original, counts, color=colors, alpha=0.7)
        ax.axhline(y=1, color='red', linestyle='--', alpha=0.5, linewidth=1)
        n_oob = np.sum(oob_mask)
        ax.set_title(f'{title}\nSelected: {n_samples - n_oob}, OOB: {n_oob} ({n_oob/n_samples*100:.1f}%)',
                    fontsize=11, fontweight='bold')
        ax.set_xlabel('Sample Index', fontsize=9)
        ax.set_ylabel('Selection Count', fontsize=9)

    plt.tight_layout()
    fp = _save_path('bootstrap_demo.png')
    fig.savefig(fp, dpi=150, bbox_inches='tight')
    plt.close(fig)
    print(f"[Done] bootstrap demo saved to {fp}")


def plot_single_vs_rf_boundary():
    """对比单决策树 vs Bagging vs 随机森林的决策边界。"""
    np.random.seed(42)
    X, y = make_moons(n_samples=300, noise=0.25, random_state=42)

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

    # 单棵树
    single_tree = DecisionTree(max_depth=None)
    single_tree.fit(X_train, y_train)

    # Bagging(无随机特征子空间)
    bagging = RandomForest(n_estimators=50, max_depth=None, max_features='all')
    bagging.fit(X_train, y_train)

    # 随机森林(有随机特征子空间)
    rf = RandomForest(n_estimators=50, max_depth=None, max_features='sqrt')
    rf.fit(X_train, y_train)

    models = [
        (single_tree, 'Single Decision Tree\n(High Variance)'),
        (bagging, 'Bagging (50 trees, all features)\n(Bootstrap only)'),
        (rf, 'Random Forest (50 trees, sqrt features)\n(Bootstrap + Feature Sampling)'),
    ]

    fig, axes = plt.subplots(1, 3, figsize=(18, 5.5))

    for ax, (model, title) in zip(axes, models):
        x_min, x_max = X[:, 0].min() - 0.5, X[:, 0].max() + 0.5
        y_min, y_max = X[:, 1].min() - 0.5, X[:, 1].max() + 0.5
        xx, yy = np.meshgrid(np.linspace(x_min, x_max, 200),
                            np.linspace(y_min, y_max, 200))
        grid = np.c_[xx.ravel(), yy.ravel()]
        Z = model.predict(grid).reshape(xx.shape)

        ax.contourf(xx, yy, Z, alpha=0.3, cmap=plt.cm.RdYlBu)
        ax.scatter(X_test[:, 0], X_test[:, 1], c=y_test, edgecolors='k',
                  cmap=plt.cm.RdYlBu, s=30)

        test_acc = accuracy_score(y_test, model.predict(X_test))
        ax.set_title(f'{title}\nTest Acc = {test_acc:.3f}', fontsize=11, fontweight='bold')
        ax.set_xlabel('Feature 1', fontsize=9)
        ax.set_ylabel('Feature 2', fontsize=9)

    plt.tight_layout()
    fp = _save_path('single_vs_rf_boundary.png')
    fig.savefig(fp, dpi=150, bbox_inches='tight')
    plt.close(fig)
    print(f"[Done] single vs RF boundary saved to {fp}")


def plot_oob_error_curve():
    """绘制 OOB 误差随树数量变化的曲线。"""
    np.random.seed(42)
    X, y = make_classification(n_samples=500, n_features=10,
                                n_informative=5, n_redundant=2,
                                random_state=42)

    n_estimators_list = [5, 10, 20, 30, 50, 75, 100, 150]

    oob_scores = []
    train_scores = []
    test_scores = []

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

    for n_est in n_estimators_list:
        rf = RandomForest(n_estimators=n_est, max_depth=None, max_features='sqrt')
        rf.fit(X_train, y_train)
        oob_scores.append(rf.oob_score(X_train, y_train))
        train_scores.append(accuracy_score(y_train, rf.predict(X_train)))
        test_scores.append(accuracy_score(y_test, rf.predict(X_test)))

    fig, ax = plt.subplots(figsize=(8, 5))
    ax.plot(n_estimators_list, oob_scores, 'go-', linewidth=2, markersize=8,
            label='OOB Score (free validation)')
    ax.plot(n_estimators_list, train_scores, 'bs-', linewidth=2, markersize=8,
            label='Train Score')
    ax.plot(n_estimators_list, test_scores, 'r^--', linewidth=2, markersize=8,
            markerfacecolor='none', label='Test Score')

    ax.set_xlabel('Number of Trees (n_estimators)', fontsize=12)
    ax.set_ylabel('Accuracy', fontsize=12)
    ax.set_title('OOB Error vs Tree Count in Random Forest\n(OOB approx. tracks test accuracy)',
                 fontsize=13, fontweight='bold')
    ax.legend(fontsize=11)
    ax.grid(True, alpha=0.3)

    plt.tight_layout()
    fp = _save_path('oob_error_curve.png')
    fig.savefig(fp, dpi=150, bbox_inches='tight')
    plt.close(fig)
    print(f"[Done] OOB error curve saved to {fp}")


def plot_sklearn_comparison():
    """与 sklearn RandomForestClassifier 对比。"""
    np.random.seed(42)
    X, y = make_classification(n_samples=300, n_features=5,
                                n_informative=3, n_redundant=1,
                                random_state=42)
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, random_state=42
    )

    n_estimators_list = [5, 10, 20, 30, 50]
    custom_scores = []
    sklearn_scores = []

    for n_est in n_estimators_list:
        # 自定义
        rf_custom = RandomForest(n_estimators=n_est, max_depth=None, max_features='sqrt')
        rf_custom.fit(X_train, y_train)
        custom_scores.append(accuracy_score(y_test, rf_custom.predict(X_test)))

        # sklearn
        rf_sk = SkRandomForest(n_estimators=n_est, max_depth=None,
                               max_features='sqrt', random_state=42)
        rf_sk.fit(X_train, y_train)
        sklearn_scores.append(accuracy_score(y_test, rf_sk.predict(X_test)))

    fig, ax = plt.subplots(figsize=(8, 5))
    ax.plot(n_estimators_list, custom_scores, 'bo-', linewidth=2, markersize=8,
            label='Custom Random Forest (NumPy)')
    ax.plot(n_estimators_list, sklearn_scores, 'rs--', linewidth=2, markersize=8,
            markerfacecolor='none', label='Sklearn RandomForest')
    ax.set_xlabel('Number of Trees', fontsize=12)
    ax.set_ylabel('Test Accuracy', fontsize=12)
    ax.set_title('Custom RF vs Sklearn RandomForest', fontsize=13, fontweight='bold')
    ax.legend(fontsize=11)
    ax.grid(True, alpha=0.3)

    plt.tight_layout()
    fp = _save_path('sklearn_rf_comparison.png')
    fig.savefig(fp, dpi=150, bbox_inches='tight')
    plt.close(fig)
    print(f"[Done] sklearn comparison saved to {fp}")


# ============================================================================
# 主函数
# ============================================================================

def main():
    print("=" * 70)
    print("ml06_random_forest/demo.py — Bagging 与随机森林")
    print("=" * 70)

    print("\n[1/4] Bootstrap 采样演示...")
    plot_bootstrap_demo()

    print("\n[2/4] 单树 vs Bagging vs 随机森林决策边界...")
    plot_single_vs_rf_boundary()

    print("\n[3/4] OOB 误差曲线...")
    plot_oob_error_curve()

    print("\n[4/4] 与 sklearn 对比...")
    plot_sklearn_comparison()

    print("\n" + "=" * 70)
    print("全部可视化完成! 图像保存在:", _IMAGES_DIR)
    print("=" * 70)


if __name__ == "__main__":
    main()