algo03 栈与队列 — demo.py 代码详解
运行方式
bash
cd algo03_stack_queue/code
python demo.py代码结构
| 类/函数 | 功能 | 重要性 |
|---|---|---|
ArrayStack / LinkedStack | 栈的两种实现 | 基础 |
is_valid_parentheses() | 括号匹配 | 面试经典题 |
infix_to_postfix() + evaluate_postfix() | 表达式求值 | Shunting-yard 算法 |
CircularQueue | 循环队列 | 固定大小队列的标准实现 |
next_greater_element() | 单调栈 | 将 O(n²) 优化到 O(n) |
sliding_window_maximum() | 单调队列 | LeetCode 239 经典 |
MinHeap | 最小堆 | 优先队列的底层实现 |
第1步:栈的两种实现
ArrayStack 直接使用 Python list,栈顶 = list 末尾。push = append, pop = pop()。
LinkedStack 用链表实现,栈顶 = 链表头。push 在头部插入,pop 从头部删除,都是 O(1)。
第2步:Shunting-yard 算法核心
Dijkstra 的 Shunting-yard 算法由三个规则驱动:
操作数 → 直接输出
'(' → 入栈
')' → 弹出栈直到遇到 '('
运算符 → 弹出栈中所有优先级 >= 当前运算符的运算符,然后当前入栈为什么 '(' 优先级最低? 因为 '(' 只有在遇到 ')' 时才弹出,其他运算符不应"越"过它。
第3步:循环队列的三个关键变量
class CircularQueue:
_capacity = k + 1 # 多一个位置区分空/满
_front = 0 # 队头索引
_rear = 0 # 队尾索引(下一个空位)
判空: front == rear
判满: (rear + 1) % capacity == front为什么要牺牲一个位置? 如果所有 k 个位置都存元素,front == rear 既可能是空也可能是满。牺牲一个位置后,front == rear 只能是空。
第4步:单调栈的 O(n) 分析
for i in range(n):
while stack and nums[i] > nums[stack[-1]]:
stack.pop() # 每个元素最多 pop 一次
stack.append(i) # 每个元素最多 push 一次总操作次数 = push n 次 + pop 最多 n 次 = O(n)
第5步:最小堆的数组表示
对于索引 i 的节点(根节点索引 0):
父节点: (i - 1) // 2
左子节点: 2*i + 1
右子节点: 2*i + 2
示例: [1, 3, 5, 7, 9]
树形: 1
/ \
3 5
/ \
7 9关键概念速查表
| 结构 | 规则 | push/enqueue | pop/dequeue | peek | 应用 |
|---|---|---|---|---|---|
| Stack | LIFO | O(1) | O(1) | O(1) | 括号匹配、DFS |
| Queue | FIFO | O(1) | O(1) | O(1) | BFS、消息队列 |
| Mono Stack | 单调 | O(1)均摊 | O(1)均摊 | - | 下一个更大元素 |
| Mono Queue | 单调 | O(1)均摊 | O(1)均摊 | O(1) | 滑动窗口最大值 |
| MinHeap | 堆序 | O(log n) | O(log n) | O(1) | 优先队列、堆排序 |
完整代码
py
# -*- coding: utf-8 -*-
"""
===============================================================================
algo03_stack_queue/code/demo.py — 栈与队列从零实现
===============================================================================
本演示从零实现以下数据结构与算法:
1. 栈(Stack)—— 基于 Python list 和自实现的链式栈
2. 队列(Queue)—— 循环队列
3. 单调栈 —— 下一个更大元素问题
4. 单调队列 —— 滑动窗口最大值
5. 表达式求值 —— 中缀转后缀(Shunting-yard)+ 后缀求值
6. 优先队列 —— 最小堆(二叉堆)
运行方式:python demo.py
依赖:matplotlib(仅用于可视化)
===============================================================================
"""
import os
import math
import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams['axes.unicode_minus'] = False
_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
_IMAGES_DIR = os.path.join(_SCRIPT_DIR, '..', 'images')
os.makedirs(_IMAGES_DIR, exist_ok=True)
# ============================================================================
# 第一部分:栈(Stack)—— 基于列表和链表两种实现
# ============================================================================
class ArrayStack:
"""基于 Python list 的栈实现(list 末尾 = 栈顶)"""
def __init__(self):
self._data = []
def push(self, item):
self._data.append(item)
def pop(self):
if self.is_empty():
raise IndexError('栈空,无法弹出')
return self._data.pop()
def peek(self):
if self.is_empty():
raise IndexError('栈空')
return self._data[-1]
def is_empty(self):
return len(self._data) == 0
def __len__(self):
return len(self._data)
def __repr__(self):
return f'Stack(bottom→{self._data}←top)'
class Node:
"""链表节点"""
def __init__(self, data):
self.data = data
self.next = None
class LinkedStack:
"""基于链表的栈实现(链表头 = 栈顶,push/pop 都是 O(1))"""
def __init__(self):
self._top = None
self._size = 0
def push(self, item):
new_node = Node(item)
new_node.next = self._top
self._top = new_node
self._size += 1
def pop(self):
if self.is_empty():
raise IndexError('栈空')
item = self._top.data
self._top = self._top.next
self._size -= 1
return item
def peek(self):
if self.is_empty():
raise IndexError('栈空')
return self._top.data
def is_empty(self):
return self._top is None
def __len__(self):
return self._size
# ============================================================================
# 第二部分:括号匹配
# ============================================================================
def is_valid_parentheses(s: str) -> bool:
"""判断括号字符串是否有效匹配。
支持: (), [], {}
时间复杂度 O(n),空间复杂度 O(n)
"""
stack = []
pairs = {')': '(', ']': '[', '}': '{'}
for ch in s:
if ch in '([{':
stack.append(ch)
elif ch in ')]}':
if not stack or stack[-1] != pairs[ch]:
return False
stack.pop()
return len(stack) == 0
# ============================================================================
# 第三部分:表达式求值(Shunting-yard 算法)
# ============================================================================
def infix_to_postfix(expression: str) -> str:
"""将中缀表达式转换为后缀表达式(Shunting-yard 算法)
支持的运算符: +, -, *, /, (, )
假定操作数为单个字母或数字字符
"""
precedence = {'+': 1, '-': 1, '*': 2, '/': 2}
output = []
stack = []
for token in expression:
if token == ' ':
continue
if token.isalnum(): # 操作数(字母或数字)
output.append(token)
elif token == '(':
stack.append(token)
elif token == ')':
while stack and stack[-1] != '(':
output.append(stack.pop())
if stack and stack[-1] == '(':
stack.pop() # 弹出 '('
elif token in precedence:
while (stack and stack[-1] != '(' and
precedence.get(stack[-1], 0) >= precedence[token]):
output.append(stack.pop())
stack.append(token)
while stack:
output.append(stack.pop())
return ''.join(output)
def evaluate_postfix(postfix: str) -> float:
"""计算后缀表达式的值
假定操作数为 0-9 的单个数字,运算符为 +, -, *, /
"""
stack = []
for token in postfix:
if token.isdigit():
stack.append(float(token))
elif token in '+-*/':
b = stack.pop()
a = stack.pop()
if token == '+':
stack.append(a + b)
elif token == '-':
stack.append(a - b)
elif token == '*':
stack.append(a * b)
elif token == '/':
stack.append(a / b)
return stack[0]
# ============================================================================
# 第四部分:循环队列
# ============================================================================
class CircularQueue:
"""循环队列(固定容量,牺牲一个位置区分空/满)"""
def __init__(self, capacity):
self._capacity = capacity + 1 # 多分配一个位置
self._data = [None] * self._capacity
self._front = 0 # 队头索引
self._rear = 0 # 队尾索引(指向下一个空位)
def is_empty(self):
return self._front == self._rear
def is_full(self):
return (self._rear + 1) % self._capacity == self._front
def enqueue(self, item):
if self.is_full():
raise OverflowError('队列已满')
self._data[self._rear] = item
self._rear = (self._rear + 1) % self._capacity
def dequeue(self):
if self.is_empty():
raise IndexError('队列空')
item = self._data[self._front]
self._data[self._front] = None
self._front = (self._front + 1) % self._capacity
return item
def peek(self):
if self.is_empty():
raise IndexError('队列空')
return self._data[self._front]
def __repr__(self):
return f'CircularQueue(front={self._front}, rear={self._rear}, data={self._data})'
# ============================================================================
# 第五部分:单调栈 —— 下一个更大元素
# ============================================================================
def next_greater_element(nums):
"""使用单调递减栈找出每个元素的下一个更大元素。
返回一个列表,result[i] 是 nums[i] 右侧第一个比它大的元素,没有则为 -1。
时间复杂度 O(n)
"""
n = len(nums)
result = [-1] * n
stack = [] # 存储索引,栈中元素对应的值单调递减
for i in range(n):
# nums[i] 比栈顶元素大 → 更新栈顶元素的答案
while stack and nums[i] > nums[stack[-1]]:
idx = stack.pop()
result[idx] = nums[i]
stack.append(i)
return result
# ============================================================================
# 第六部分:单调队列 —— 滑动窗口最大值
# ============================================================================
def sliding_window_maximum(nums, k):
"""使用单调递减队列找出每个滑动窗口的最大值。
时间复杂度 O(n)
"""
from collections import deque
result = []
dq = deque() # 存储索引,对应值单调递减
for i in range(len(nums)):
# 删除队尾所有比当前元素小的索引
while dq and nums[i] >= nums[dq[-1]]:
dq.pop()
dq.append(i)
# 删除超出窗口范围的队头元素
if dq[0] <= i - k:
dq.popleft()
# 窗口形成后记录结果
if i >= k - 1:
result.append(nums[dq[0]])
return result
# ============================================================================
# 第七部分:最小堆(二叉堆实现优先队列)
# ============================================================================
class MinHeap:
"""最小堆 —— 根节点是最小值"""
def __init__(self):
self._data = []
def push(self, item):
"""插入元素,上浮调整 —— O(log n)"""
self._data.append(item)
self._sift_up(len(self._data) - 1)
def pop(self):
"""弹出最小值,下沉调整 —— O(log n)"""
if self.is_empty():
raise IndexError('堆空')
if len(self._data) == 1:
return self._data.pop()
root = self._data[0]
self._data[0] = self._data.pop() # 末尾元素放到堆顶
self._sift_down(0)
return root
def peek(self):
"""查看最小值 —— O(1)"""
if self.is_empty():
raise IndexError('堆空')
return self._data[0]
def _sift_up(self, idx):
"""上浮:当前节点比父节点小时,与父节点交换"""
parent = (idx - 1) // 2
while idx > 0 and self._data[idx] < self._data[parent]:
self._data[idx], self._data[parent] = self._data[parent], self._data[idx]
idx = parent
parent = (idx - 1) // 2
def _sift_down(self, idx):
"""下沉:当前节点比子节点大时,与较小的子节点交换"""
n = len(self._data)
while True:
smallest = idx
left = 2 * idx + 1
right = 2 * idx + 2
if left < n and self._data[left] < self._data[smallest]:
smallest = left
if right < n and self._data[right] < self._data[smallest]:
smallest = right
if smallest == idx:
break
self._data[idx], self._data[smallest] = self._data[smallest], self._data[idx]
idx = smallest
def is_empty(self):
return len(self._data) == 0
def __len__(self):
return len(self._data)
def __repr__(self):
return f'MinHeap({self._data})'
def heap_sort(arr):
"""堆排序 —— O(n log n),原地排序"""
heap = MinHeap()
for x in arr:
heap.push(x)
result = []
while not heap.is_empty():
result.append(heap.pop())
return result
# ============================================================================
# 第八部分:可视化
# ============================================================================
def plot_monotonic_structures():
"""可视化单调栈和单调队列的算法过程"""
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 左上:单调栈示例
nums = [2, 1, 2, 4, 3]
result = next_greater_element(nums)
ax1 = axes[0, 0]
x = range(len(nums))
bars = ax1.bar(x, nums, color='#90CAF9', edgecolor='#1565C0', linewidth=1.5)
for i, (num, res) in enumerate(zip(nums, result)):
ax1.text(i, num + 0.1, f'{num}\n→{res}', ha='center', va='bottom', fontsize=10)
ax1.set_xticks(list(x))
ax1.set_xticklabels([str(i) for i in range(len(nums))])
ax1.set_title('单调栈:下一个更大元素', fontsize=14)
ax1.set_ylabel('值', fontsize=12)
# 右上:滑动窗口最大值
nums2 = [1, 3, -1, -3, 5, 3, 6, 7]
k = 3
swm = sliding_window_maximum(nums2, k)
ax2 = axes[0, 1]
x2 = range(len(nums2))
ax2.bar(x2, nums2, color='#A5D6A7', edgecolor='#2E7D32', linewidth=1)
# 标注窗口最大值
for i in range(k - 1, len(nums2)):
ax2.text(i, nums2[i] + 0.5, f'max={swm[i-k+1]}',
ha='center', fontsize=9, color='red', fontweight='bold')
ax2.set_xticks(list(x2))
ax2.set_title(f'单调队列:滑动窗口最大值 (k={k})', fontsize=14)
ax2.set_ylabel('值', fontsize=12)
# 左下:堆排序过程
ax3 = axes[1, 0]
arr = [7, 3, 9, 1, 5, 2, 8]
heap = MinHeap()
snapshots = []
for x in arr:
heap.push(x)
snapshots.append(list(heap._data))
ax3.plot(range(1, len(arr) + 1), arr, 'o-', label='原始数组', linewidth=2)
ax3.set_title('最小堆的构建过程', fontsize=14)
ax3.set_xlabel('元素索引', fontsize=12)
ax3.legend(fontsize=10)
ax3.grid(True, alpha=0.3)
# 右下:表达式求值过程表
ax4 = axes[1, 1]
ax4.axis('off')
table_data = [
['中缀', '3+4*2/(1-5)'],
['后缀', infix_to_postfix('3+4*2/(1-5)')],
['结果', str(evaluate_postfix(infix_to_postfix('3+4*2/(1-5)')))],
['', ''],
['括号匹配: "{[()]}"', str(is_valid_parentheses('{[()]}'))],
['括号匹配: "[(])"', str(is_valid_parentheses('[(])'))],
]
table = ax4.table(cellText=table_data, cellLoc='left',
colWidths=[0.3, 0.7], loc='center')
table.auto_set_font_size(False)
table.set_fontsize(12)
for key, cell in table.get_celld().items():
if key[0] == 0 or key[0] == 3:
cell.set_facecolor('#E3F2FD')
ax4.set_title('表达式求值与括号匹配', fontsize=14)
plt.tight_layout()
out = os.path.join(_IMAGES_DIR, 'stack_queue_demo.png')
fig.savefig(out, dpi=150, bbox_inches='tight')
plt.close(fig)
print(f'[图片保存] {out}')
# ============================================================================
# 主函数
# ============================================================================
def main():
print('=' * 60)
print(' 栈与队列 — 演示程序')
print('=' * 60)
# 1. 栈
print('\n--- 栈 ---')
stack = ArrayStack()
for x in [1, 2, 3]:
stack.push(x)
print(f'push 1,2,3 后: {stack}')
print(f'pop: {stack.pop()}, 栈变为: {stack}')
print(f'peek: {stack.peek()}')
# 2. 括号匹配
print('\n--- 括号匹配 ---')
test_cases = ['()', '()[]{}', '(]', '([)]', '{[]}', '']
for s in test_cases:
print(f' "{s}" → {"有效 ✅" if is_valid_parentheses(s) else "无效 ❌"}')
# 3. 表达式求值
print('\n--- 表达式求值 ---')
expr = '3+4*2'
postfix = infix_to_postfix(expr)
result = evaluate_postfix(postfix)
print(f'中缀: {expr}')
print(f'后缀: {postfix}')
print(f'结果: {result} (预期: 11)')
expr2 = '1+2*3-4/2'
postfix2 = infix_to_postfix(expr2)
print(f'\n中缀: {expr2}')
print(f'后缀: {postfix2}')
print(f'结果: {evaluate_postfix(postfix2)} (预期: 5.0)')
# 4. 循环队列
print('\n--- 循环队列 ---')
cq = CircularQueue(4)
for x in [10, 20, 30]:
cq.enqueue(x)
print(f'enqueue 10,20,30: {cq}')
print(f'dequeue: {cq.dequeue()} → {cq}')
cq.enqueue(40); cq.enqueue(50)
print(f'enqueue 40,50: {cq}')
print(f'队列满? {cq.is_full()}')
# 5. 单调栈
print('\n--- 单调栈:下一个更大元素 ---')
nums = [2, 1, 2, 4, 3]
print(f'输入: {nums}')
print(f'输出: {next_greater_element(nums)}')
# 6. 单调队列
print('\n--- 单调队列:滑动窗口最大值 ---')
nums2 = [1, 3, -1, -3, 5, 3, 6, 7]
k = 3
print(f'输入: {nums2}, k={k}')
print(f'输出: {sliding_window_maximum(nums2, k)}')
# 7. 最小堆
print('\n--- 最小堆 ---')
heap = MinHeap()
for x in [7, 3, 9, 1, 5]:
heap.push(x)
print(f'push 7,3,9,1,5: {heap}')
print(f'堆顶 (最小值): {heap.peek()}')
sorted_result = heap_sort([7, 3, 9, 1, 5, 2])
print(f'堆排序: {sorted_result}')
# 8. 可视化
plot_monotonic_structures()
print('\n✅ 所有演示已完成!图片保存在 images/ 目录中。')
if __name__ == '__main__':
main()