Skip to content

ml11 隐马尔可夫模型 — demo.py 代码详解

Download demo.py

运行方式

bash
cd ml11_hmm/code
python demo.py

代码逐段详解

第1步:HMM 类与前向算法

python
class HMM:
    def __init__(self, A, B, pi):
        self.A = np.array(A)    # 转移矩阵: P(Z_{t+1}=j | Z_t=i)
        self.B = np.array(B)    # 发射矩阵: P(X_t=o | Z_t=i)
        self.pi = np.array(pi)  # 初始分布: P(Z_1=i)

HMM 的三个参数对应三个概率分布:初始状态分布 πN 维向量)、状态转移矩阵 AN×N)、发射概率矩阵 BN×M)。

前向算法(带缩放)

python
def forward(self, observations):
    alphas = np.zeros((T, self.N))
    c = np.zeros(T)

    # 初始化: α_1(i) = π_i · B[i, x_1]
    for i in range(self.N):
        alphas[0, i] = self.pi[i] * self.B[i, observations[0]]
    c[0] = alphas[0].sum()
    alphas[0] /= c[0]  # 缩放防止下溢

    # 递推: α_t(j) = [Σ_i α_{t-1}(i) · A[i, j]] · B[j, x_t]
    for t in range(1, T):
        for j in range(self.N):
            alphas[t, j] = np.dot(alphas[t-1, :], self.A[:, j]) * self.B[j, observations[t]]
        c[t] = alphas[t].sum()
        alphas[t] /= c[t]

    log_prob = np.sum(np.log(c + 1e-300))

为什么需要缩放(Scaling)? 当序列长度 T 较大时,αt(i) 的值会急剧变小(每个因子 1,连乘 T 次)。不加缩放的 αT 可能下溢到 0。缩放的技巧是每步除以 ct=iαt(i),使得每步的 α~t 之和为 1。观测概率的对数可以通过缩放因子恢复:logP(X|λ)=t=1Tlogct

为什么这有效?α~t=αt/(s=1tcs),则 s=1Tcs=iαT(i)=P(X|λ),取对数即为 logct

第2步:Viterbi 算法

python
def viterbi(self, observations):
    # 使用对数空间防止下溢
    deltas[0, i] = np.log(self.pi[i] + eps) + np.log(self.B[i, o1] + eps)

    for t in range(1, T):
        for j in range(self.N):
            candidates = deltas[t-1, :] + np.log(self.A[:, j] + eps)
            best_i = np.argmax(candidates)
            deltas[t, j] = candidates[best_i] + np.log(self.B[j, ot] + eps)
            psi[t, j] = best_i  # 回溯指针

Viterbi 与前向算法的关键区别:max 代替 sum。这一步体现了不同的目标——前向算法问"所有路径的总概率是多少",Viterbi 问"哪一条路径最可能"。

对数空间中使用加法律(log(ab)=loga+logb)替代乘法律,数值更稳定。特殊地,max 操作在 log 空间保持不变。

回溯

python
best_path[T-1] = best_final_state
for t in range(T-2, -1, -1):
    best_path[t] = psi[t+1, best_path[t+1]]

从最后时刻的最优状态出发,利用 ψt(j) 存储的回溯指针,逆向重构整个最优路径。这就是经典的动态规划回溯技术。

第3步:POS 标注格子图可视化

格子的 x 轴是时间(观测序列),y 轴是隐藏状态。两个热度图对比了:

  • 左图(Forward α:展示了在每个时刻处于每个状态的概率(边际化所有路径后)
  • 右图(Viterbi δ + 路径):展示了最优路径的构建过程——蓝色箭头连接最优序列

第4步:马尔可夫链转移图

展示了三个状态的天气马尔可夫链(Sunny/Cloudy/Rainy),箭头粗细正比于转移概率。通过迭代 π(n+1)=π(n)A 或求解 πA=π 得到平稳分布。

第5步:转移矩阵对状态序列的影响

对比两种极端的转移矩阵:

  • 平滑转移:高自转移概率(0.7),状态倾向保持稳定
  • 尖峰转移:强制循环结构(S1→S2→S3→S1),状态快速切换

Viterbi 解码出的路径反映了转移矩阵的结构——HMM 不可能解码出不遵守转移规律的路径。

关键概念速查表

概念数学形式代码位置关键说明
前向 ααt(j)=[αt1aij]bj(xt)HMM.forward()动态规划 O(N^2 T)
缩放α~t=αt/ctc[t] 归一化防止概率下溢
Viterbi δδt(j)=maxi[δt1aij]bj(xt)HMM.viterbi()max 代替 sum
回溯 ψψt(j)=argmaxi[δt1(i)aij]psi[t, j]记录最优前驱
平稳分布π=πAnp.linalg.eig(A.T)链的平衡状态
log 空间log(ab)=loga+logbnp.log(...)数值稳定替代乘法律

完整代码

py
# -*- coding: utf-8 -*-
"""
ml11 隐马尔可夫模型 (HMM) — 演示代码
=====================================
功能:从零实现 HMM 的前向算法(评估)、Viterbi 算法(解码),
      展示 POS 词性标注的格子图(trellis)和状态序列。

每个函数都有中文 docstring,每行逻辑代码都有中文注释。
运行方式:在 ml11_hmm/ 目录下执行 python code/demo.py
"""

import os
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams['axes.unicode_minus'] = False
from matplotlib.patches import FancyArrowPatch

_HERE = os.path.dirname(os.path.abspath(__file__))
_IMAGES_DIR = os.path.join(_HERE, '..', 'images')
os.makedirs(_IMAGES_DIR, exist_ok=True)


# ============================================================================
# 第一部分:HMM 类定义
# ============================================================================

class HMM:
    """
    隐马尔可夫模型(离散观测)。

    参数 λ = (A, B, pi):
        A[i, j] = P(Z_{t+1}=j | Z_t=i)  — 状态转移矩阵 (N, N)
        B[i, o] = P(X_t=o | Z_t=i)       — 发射概率矩阵 (N, M)
        pi[i]   = P(Z_1=i)                — 初始状态分布 (N,)

    属性:
        N: 隐藏状态数
        M: 观测符号数(词汇量)
    """

    def __init__(self, A, B, pi):
        self.A = np.array(A)    # (N, N)
        self.B = np.array(B)    # (N, M)
        self.pi = np.array(pi)  # (N,)
        self.N = len(pi)
        self.M = B.shape[1]

    def forward(self, observations):
        """
        前向算法:计算 P(X | λ)。

        递推公式:
            α_1(i) = π_i · B[i, x_1]
            α_t(j) = [Σ_i α_{t-1}(i) · A[i, j]] · B[j, x_t]
            P(X | λ) = Σ_i α_T(i)

        使用缩放(scaling)防止下溢:
            c_t = Σ_i α_t(i)
            α̃_t(i) = α_t(i) / c_t
            log P = Σ_t log c_t

        参数:
            observations: 观测序列(索引列表),长度 T

        返回:
            log_prob: log P(X | λ)
            alphas: 缩放后的 α 矩阵 (T, N),α[t, i] = α̃_{t+1}(i)
            c: 每步的缩放因子 (T,)
        """
        T = len(observations)
        alphas = np.zeros((T, self.N))  # (T, N)
        c = np.zeros(T)                  # 缩放因子

        # 初始化 t=1
        o1 = observations[0]
        for i in range(self.N):
            alphas[0, i] = self.pi[i] * self.B[i, o1]
        c[0] = alphas[0].sum()
        if c[0] > 0:
            alphas[0] /= c[0]

        # 递推 t = 2, ..., T
        for t in range(1, T):
            ot = observations[t]
            for j in range(self.N):
                # α_t(j) = [Σ_i α_{t-1}(i) * A[i, j]] * B[j, o_t]
                alphas[t, j] = np.dot(alphas[t-1, :], self.A[:, j]) * self.B[j, ot]
            c[t] = alphas[t].sum()
            if c[t] > 0:
                alphas[t] /= c[t]

        log_prob = np.sum(np.log(c + 1e-300))  # 防止 log(0)
        return log_prob, alphas, c

    def viterbi(self, observations):
        """
        Viterbi 算法:找到最可能的状态序列。

        递推:
            δ_1(i) = π_i · B[i, x_1]
            δ_t(j) = max_i [δ_{t-1}(i) · A[i, j]] · B[j, x_t]
            ψ_t(j) = argmax_i [δ_{t-1}(i) · A[i, j]]

        使用对数防止下溢:
            log δ_t(j) = max_i [log δ_{t-1}(i) + log A[i, j]] + log B[j, x_t]

        参数:
            observations: 观测序列,长度 T

        返回:
            log_prob: 最优路径的对数概率
            best_path: 最优状态序列(索引列表),长度 T
            deltas: log δ 矩阵 (T, N)
        """
        T = len(observations)
        deltas = np.zeros((T, self.N))  # log δ_t(i)
        psi = np.zeros((T, self.N), dtype=int)  # 回溯指针 ψ_t(i)

        # 初始化 t=1
        o1 = observations[0]
        for i in range(self.N):
            deltas[0, i] = np.log(self.pi[i] + 1e-300) + np.log(self.B[i, o1] + 1e-300)

        # 递推 t = 2, ..., T
        for t in range(1, T):
            ot = observations[t]
            for j in range(self.N):
                # log δ_t(j) = max_i [log δ_{t-1}(i) + log A[i, j]] + log B[j, ot]
                candidates = deltas[t-1, :] + np.log(self.A[:, j] + 1e-300)
                best_i = np.argmax(candidates)
                deltas[t, j] = candidates[best_i] + np.log(self.B[j, ot] + 1e-300)
                psi[t, j] = best_i

        # 终止:找到最优路径的终点
        best_final_state = np.argmax(deltas[T-1, :])
        log_prob = deltas[T-1, best_final_state]

        # 回溯
        best_path = np.zeros(T, dtype=int)
        best_path[T-1] = best_final_state
        for t in range(T-2, -1, -1):
            best_path[t] = psi[t+1, best_path[t+1]]

        return log_prob, best_path, deltas


# ============================================================================
# 第二部分:POS Tagging 示例
# ============================================================================

def pos_tagging_demo():
    """
    用简化的 POS 标注示例演示 HMM 的 Forward 和 Viterbi 算法。

    状态 = 词性标签: [DET(0=限定词), NOUN(1=名词), VERB(2=动词)]
    观测 = 词汇:     [0=the, 1=cat, 2=sat, 3=mat]
    """
    # 定义 HMM 参数
    N = 3  # 状态数: DET, NOUN, VERB
    M = 4  # 观测数: the(0), cat(1), sat(2), mat(3)

    # 转移矩阵 A[i, j] = P(Z_{t+1}=j | Z_t=i)
    #        DET   NOUN  VERB
    A = np.array([
        [0.1, 0.8, 0.1],  # DET → 通常后接 NOUN
        [0.1, 0.3, 0.6],  # NOUN → 通常后接 VERB
        [0.4, 0.4, 0.2],  # VERB → 可接 DET 或 NOUN
    ])

    # 发射矩阵 B[i, o] = P(X_t=o | Z_t=i)
    #        the  cat  sat  mat
    B = np.array([
        [0.8, 0.05, 0.1, 0.05],  # DET  → 通常产生 "the"
        [0.05, 0.6, 0.1, 0.25],  # NOUN → 通常产生 "cat"/"mat"
        [0.05, 0.1, 0.8, 0.05],  # VERB → 通常产生 "sat"
    ])

    # 初始分布 π[i] = P(Z_1=i)
    pi = np.array([0.5, 0.3, 0.2])  # 句子大概率以 DET 开头

    hmm = HMM(A, B, pi)

    # 观测: "the cat sat on the mat" → 只取 the(0), cat(1), sat(2), the(0), mat(3)
    observations = [0, 1, 2, 0, 3]  # the, cat, sat, the, mat

    # 前向算法
    log_prob, alphas, c = hmm.forward(observations)
    print(f'  观测序列: the cat sat the mat')
    print(f'  log P(X | λ) = {log_prob:.4f}')

    # Viterbi 解码
    log_prob_v, best_path, deltas = hmm.viterbi(observations)
    state_names = ['DET', 'NOUN', 'VERB']
    obs_names = ['the', 'cat', 'sat', 'the', 'mat']
    print(f'  Viterbi 最优路径: {" ".join(state_names[s] for s in best_path)}')
    print(f'  log P*(X, Z) = {log_prob_v:.4f}')

    # 可视化格子图
    plot_pos_trellis(alphas, deltas, best_path, state_names, obs_names,
                     observations)

    return alphas, best_path


def plot_pos_trellis(alphas, deltas, best_path, state_names, obs_names, observations):
    """
    绘制 POS tagging 的格子图(trellis),展示 Forward 和 Viterbi 权重。
    """
    T = len(observations)
    N = len(state_names)

    fig, axes = plt.subplots(1, 2, figsize=(16, 5))

    # 左图: Forward α 热度图
    ax1 = axes[0]
    im1 = ax1.imshow(alphas.T, aspect='auto', cmap='YlOrRd', origin='upper')
    ax1.set_xticks(range(T))
    ax1.set_xticklabels([obs_names[o] for o in observations], fontsize=10)
    ax1.set_yticks(range(N))
    ax1.set_yticklabels(state_names, fontsize=10)
    ax1.set_xlabel('Time (Observation)')
    ax1.set_ylabel('Hidden State')
    ax1.set_title('Forward α (scaled)')

    # 在格子上标注数值
    for i in range(N):
        for t in range(T):
            ax1.text(t, i, f'{alphas[t, i]:.3f}', ha='center', va='center',
                     fontsize=8, color='black' if alphas[t, i] < 0.6 else 'white')

    plt.colorbar(im1, ax=ax1, shrink=0.8)

    # 右图: Viterbi 路径
    ax2 = axes[1]
    im2 = ax2.imshow(np.exp(deltas).T, aspect='auto', cmap='YlOrRd', origin='upper')
    ax2.set_xticks(range(T))
    ax2.set_xticklabels([obs_names[o] for o in observations], fontsize=10)
    ax2.set_yticks(range(N))
    ax2.set_yticklabels(state_names, fontsize=10)
    ax2.set_xlabel('Time (Observation)')
    ax2.set_ylabel('Hidden State')
    ax2.set_title('Viterbi δ (exp) with Best Path')

    # 画最优路径
    for t in range(T - 1):
        ax2.annotate('', xy=(t + 1, best_path[t + 1]), xytext=(t, best_path[t]),
                     arrowprops=dict(arrowstyle='->', color='blue', lw=2.5,
                                     connectionstyle='arc3,rad=0'))
    # 标注路径节点
    for t in range(T):
        ax2.plot(t, best_path[t], 'o', color='blue', markersize=10, markeredgecolor='white')

    plt.colorbar(im2, ax=ax2, shrink=0.8)
    plt.tight_layout()
    path = os.path.join(_IMAGES_DIR, 'ml11-04-pos-trellis.png')
    fig.savefig(path, dpi=150, bbox_inches='tight')
    plt.close()
    print(f'[保存] {path}')


# ============================================================================
# 第三部分:马尔可夫链可视化
# ============================================================================

def plot_markov_chain_example():
    """绘制一个简单的 3 状态马尔可夫链及其平稳分布"""
    # 转移矩阵
    A = np.array([
        [0.5, 0.3, 0.2],
        [0.2, 0.6, 0.2],
        [0.1, 0.3, 0.6],
    ])

    # 计算平稳分布:解 πA = π,即 π(A - I) = 0,且 Σπ = 1
    # 等价于求 A^T 的特征值 1 对应的特征向量
    eigenvalues, eigenvectors = np.linalg.eig(A.T)
    idx = np.argmin(np.abs(eigenvalues - 1.0))
    pi = np.real(eigenvectors[:, idx])
    pi = pi / pi.sum()

    fig, axes = plt.subplots(1, 2, figsize=(14, 5))

    # 左图:转移图
    ax1 = axes[0]
    # 三个状态节点的坐标(三角形排列)
    positions = {0: (0, 1), 1: (1, 0), 2: (2, 1)}
    state_labels = ['State 1\n(Sunny)', 'State 2\n(Cloudy)', 'State 3\n(Rainy)']
    colors = ['#FFD700', '#B0C4DE', '#6B8E23']

    # 画节点
    for s in range(3):
        x, y = positions[s]
        circle = plt.Circle((x, y), 0.15, color=colors[s], ec='black', linewidth=1.5,
                            zorder=2)
        ax1.add_patch(circle)
        ax1.text(x, y, str(s+1), ha='center', va='center', fontsize=12, fontweight='bold')
        ax1.text(x, y - 0.3, state_labels[s], ha='center', va='top', fontsize=9)

    # 画转移箭头
    for i in range(3):
        for j in range(3):
            if A[i, j] > 0.01:
                xi, yi = positions[i]
                xj, yj = positions[j]

                if i == j:
                    # 自环
                    ax1.annotate('', xy=(xi + 0.08, yi + 0.12), xytext=(xi - 0.08, yi + 0.12),
                                 arrowprops=dict(arrowstyle='->', color='gray',
                                                 connectionstyle='arc3,rad=0.5', lw=1.5))
                    ax1.text(xi, yi + 0.28, f'{A[i,j]:.1f}', ha='center', fontsize=8, color='gray')
                else:
                    ax1.annotate('', xy=(xj, yj), xytext=(xi, yi),
                                 arrowprops=dict(arrowstyle='->', color='gray',
                                                 connectionstyle='arc3,rad=0.2',
                                                 lw=A[i,j]*4, alpha=0.7))
                    mx, my = (xi + xj) / 2, (yi + yj) / 2
                    offset = 0.08 if abs(xi - xj) < 0.1 else 0
                    ax1.text(mx, my + 0.1, f'{A[i,j]:.1f}', ha='center', fontsize=9, color='navy')

    ax1.set_xlim(-0.5, 2.5)
    ax1.set_ylim(-0.3, 1.5)
    ax1.set_aspect('equal')
    ax1.axis('off')
    ax1.set_title('Markov Chain Transition Diagram\nA = Transition Matrix')

    # 右图:平稳分布
    ax2 = axes[1]
    bars = ax2.bar(range(3), pi, color=colors, edgecolor='black', linewidth=1)
    ax2.set_xticks(range(3))
    ax2.set_xticklabels(['Sunny', 'Cloudy', 'Rainy'])
    ax2.set_ylabel('Stationary Probability')
    ax2.set_title('Stationary Distribution π')
    for bar, val in zip(bars, pi):
        ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
                 f'{val:.3f}', ha='center', fontsize=10)
    ax2.set_ylim(0, max(pi) * 1.2)

    plt.tight_layout()
    path = os.path.join(_IMAGES_DIR, 'ml11-05-markov-chain.png')
    fig.savefig(path, dpi=150, bbox_inches='tight')
    plt.close()
    print(f'[保存] {path}')


# ============================================================================
# 第四部分:HMM 在不同转移矩阵下的行为
# ============================================================================

def compare_hmm_behaviors():
    """对比不同转移矩阵下 HMM 的状态序列特性"""
    observations = [0, 1, 2, 0, 3, 1, 2]  # 7 个观测

    # 场景 1:平滑转移(相邻状态间高转移概率)
    A_smooth = np.array([
        [0.7, 0.2, 0.1],
        [0.1, 0.7, 0.2],
        [0.1, 0.2, 0.7],
    ])

    # 场景 2:尖峰转移(只向特定状态转移)
    A_peaky = np.array([
        [0.1, 0.85, 0.05],
        [0.05, 0.1, 0.85],
        [0.85, 0.05, 0.1],
    ])

    B = np.array([
        [0.8, 0.1, 0.1, 0.0],
        [0.1, 0.6, 0.2, 0.1],
        [0.05, 0.15, 0.7, 0.1],
    ])
    pi = np.array([0.33, 0.34, 0.33])

    hmm1 = HMM(A_smooth, B, pi)
    hmm2 = HMM(A_peaky, B, pi)

    _, path1, _ = hmm1.viterbi(observations)
    _, path2, _ = hmm2.viterbi(observations)

    fig, axes = plt.subplots(1, 2, figsize=(14, 5))

    state_names = ['S1', 'S2', 'S3']
    T = len(observations)

    for idx, (ax, path, A, title) in enumerate([
        (axes[0], path1, A_smooth, 'Smooth Transitions\n(high self-transition)'),
        (axes[1], path2, A_peaky, 'Peaky Transitions\n(forced cycle S1→S2→S3→S1)'),
    ]):
        # 画转移矩阵热力图
        im = ax.imshow(A, cmap='YlOrRd', vmin=0, vmax=1)
        for i in range(3):
            for j in range(3):
                ax.text(j, i, f'{A[i,j]:.2f}', ha='center', va='center', fontsize=9,
                        color='black' if A[i, j] < 0.6 else 'white')
        ax.set_xticks(range(3))
        ax.set_yticks(range(3))
        ax.set_xticklabels(state_names)
        ax.set_yticklabels(state_names)
        ax.set_xlabel('To State')
        ax.set_ylabel('From State')
        ax.set_title(title)

        # 标注 Viterbi 路径
        path_str = ' → '.join([state_names[p] for p in path])
        ax.text(1.5, -0.4, f'Viterbi: {path_str}', ha='center', fontsize=11,
                transform=ax.transData, color='blue',
                bbox=dict(boxstyle='round', facecolor='lightyellow', alpha=0.8))

    plt.tight_layout()
    path = os.path.join(_IMAGES_DIR, 'ml11-06-hmm-behaviors.png')
    fig.savefig(path, dpi=150, bbox_inches='tight')
    plt.close()
    print(f'[保存] {path}')


# ============================================================================
# 第五部分:主程序
# ============================================================================

def main():
    print('=' * 60)
    print('ml11 隐马尔可夫模型 (HMM) — 演示代码')
    print('=' * 60)

    # 1. POS 标注 Demo
    print('\n[1/3] POS Tagging Demo (Forward + Viterbi)...')
    pos_tagging_demo()

    # 2. 马尔可夫链可视化
    print('[2/3] 马尔可夫链与平稳分布...')
    plot_markov_chain_example()

    # 3. 不同转移矩阵下的行为对比
    print('[3/3] 转移矩阵对状态序列的影响...')
    compare_hmm_behaviors()

    print('\n完成!所有图表已保存到 images/ 目录。')


if __name__ == '__main__':
    main()