那天晚上下班挺晚的,大概十一点多,我在公司楼下等外卖。手机那边,我们组小李还在吐槽:“哥,我这个脚本要跑一个小时,老板说让‘搞搞并行’,你说这玩意儿不是一句for就完了吗,还并行啥啊?”
我就让他把代码甩过来看了一眼,典型的“一个 for 把一堆活全干了”的那种:
def process_file(path: str) -> dict:
# 模拟处理一下
import time, hashlib
time.sleep(0.5)
return {
"path": path,
"md5": hashlib.md5(path.encode()).hexdigest(),
}
paths = [f"file_{i}.txt" for i in range(20)]
results = [process_file(p) for p in paths]
print(results)
20 个文件,每个模拟处理耗时 0.5 秒,串行执行就是 10 秒往上,这还只是测试环境。如果真在服务器上扫描处理几千个日志文件,可能人都下班了脚本还没跑完。
我当时就跟他说,其实你这活儿,改“一行”代码基本就能让程序在多核上飞起来。当然,这一行前后需要搭建点简单的骨架,否则 Python 不知道你想干嘛。
并行加速的原理:CPU密集与I/O密集
在动手之前,我们得先理清一个关键概念,以免后续思路打架:
- CPU密集型任务:如果你的函数计算量非常大,例如压缩图片、进行大量数学运算,这类任务主要消耗CPU资源。
- I/O密集型任务:如果你的函数大部分时间在等待,例如读取文件、查询数据库、发起HTTP网络请求,这类任务主要耗时在输入/输出上。
这两类任务在 Python 中实现并行的路径是不同的:
- I/O密集型:使用多线程就能获得显著加速。因为线程在等待 I/O 操作(如磁盘读写、网络响应)时,Python 的 GIL(全局解释器锁) 会被释放,其他线程可以趁机执行。
- CPU密集型:必须使用多进程,才能绕过 GIL 的限制,真正利用多个CPU核心进行计算。
所以,我当时先问了小李一句:“你这个 process_file 主要是在干嘛?读磁盘加 sleep 模拟等待?”他说差不多,主要是读一堆日志文件,里面还夹杂着一些 HTTP 请求。那这就好办了,属于典型的I/O密集型,用多线程一行代码就能搞定。
一行实现线程池并行:把 for 换成 map
我直接把他那段代码改成了下面这样(核心真的就一行,稍后解释):
from concurrent.futures import ThreadPoolExecutor
def process_file(path: str) -> dict:
import time, hashlib
time.sleep(0.5)
return {
"path": path,
"md5": hashlib.md5(path.encode()).hexdigest(),
}
paths = [f"file_{i}.txt" for i in range(20)]
with ThreadPoolExecutor(max_workers=8) as pool:
results = list(pool.map(process_file, paths)) # ← 关键的一行并行
print(results)
你看,原来的串行逻辑是:
results = [process_file(p) for p in paths]
现在变成了:
results = list(pool.map(process_file, paths))
核心逻辑没变:依然是“对每个 path 调用一次 process_file 函数,再把所有结果收集成一个列表”。但区别在于,现在是线程池里的 8 个线程(max_workers=8)在协同工作,而不是主线程一个人顺序干 20 次。
如果你有“代码行数强迫症”,非要追求物理上的“一行版本”,在脚本里这么写也是合法的(尽管可读性会变差,不推荐实际使用):
from concurrent.futures import ThreadPoolExecutor; results = list(ThreadPoolExecutor(max_workers=8).map(process_file, paths))
这确实实现了“一行 Python 代码完成并行”,但一般不建议这样写,否则日后同事 Review 代码时可能会想找你“聊聊”。
性能对比:效果立竿见影
我当时一边等外卖一边敲了个简单的对比 Demo 发给他。你也可以在自己的电脑上跑一下,感受速度差异:
import time
from concurrent.futures import ThreadPoolExecutor
def work(x: int) -> int:
time.sleep(0.5) # 模拟 I/O 等待
return x * x
numbers = list(range(20))
# 串行版本
start = time.perf_counter()
serial = [work(n) for n in numbers]
print("serial cost:", time.perf_counter() - start)
# 并行版本(关键一行)
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=8) as pool:
parallel = list(pool.map(work, numbers))
print("parallel cost:", time.perf_counter() - start)
print(serial == parallel)
你大概能看到类似这样的输出(关注量级即可):
- 串行耗时:接近 10 秒(20个任务 * 0.5秒)
- 并行耗时:大约 1.5 ~ 2 秒
并且验证结果 serial == parallel 返回 True,说明并行处理并没有改变最终结果的正确性,但时间直接缩短了好几倍。
关键的速度提升就来自那一行:
parallel = list(pool.map(work, numbers))
我跟小李说,你只需要记住一个核心心法:
“以后看到用 for 循环处理一堆相互独立小任务的地方,就先想想能不能把它改成 pool.map(func, tasks)。”
至于线程池、上下文管理器 (with),那些都是外围的标准包装。
CPU密集型任务怎么办?换进程池!
说完I/O密集型,我想起前两天还有个兄弟在做图片批量缩放,用的是 Pillow 库。他直接写了个 for 循环,导致一个CPU核心满载,其他核心在“摸鱼”。那种场景就不能再用线程池了,必须换成进程池。
我直接给了他一个改造后的版本:
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
from PIL import Image
def resize_image(path: str) -> str:
img = Image.open(path)
img = img.resize((256, 256))
out = Path("output") / Path(path).name
out.parent.mkdir(parents=True, exist_ok=True)
img.save(out)
return str(out)
if __name__ == "__main__": # 注意这个!多进程编程必须要有
images = [str(p) for p in Path("images").glob("*.jpg")]
with ProcessPoolExecutor() as pool:
results = list(pool.map(resize_image, images)) # 这里一行实现多进程并行
print(results)
这个版本想强调的是:
- 把
ThreadPoolExecutor 换成 ProcessPoolExecutor。
- 入口必须加上
if __name__ == "__main__":,尤其是在 Windows 系统上,这是为了避免子进程无限递归创建,导致程序崩溃。
- 本质依然是:一个
map 把原来的 for 循环变成了“多个进程一起跑”。
其背后的思想,与高并发系统设计中的任务分发与调度有异曲同工之妙。核心思路不变:把“要处理的一堆任务”变成一个可迭代对象,然后用一行 Executor().map(func, iterable) 抛给池子去并行处理。
关键概念:GIL 到底要不要管?
我知道很多人脑子里会立刻蹦出“GIL”这个词。小李当时也是,上来就说:“Python 有 GIL(全局解释器锁),多线程不就没用了吗?”
我就跟他简单解释了一下:
- GIL 会限制纯 Python 代码在同一时刻只能被一个线程执行。
- 但是,当线程在执行I/O操作(如网络请求
requests.get()、文件读取 .read())或调用某些底层 C 库函数时,GIL 会被释放,此时其他线程就可以获得执行权。
所以结论很清晰:
- 如果你的任务大部分时间在等待 I/O(
time.sleep, requests.get, 文件 read/write),使用 ThreadPoolExecutor(线程池)效果会非常好。
- 如果你的任务大部分时间在进行纯 Python 计算(数学循环、加解密),就必须使用
ProcessPoolExecutor(进程池)才能有效利用多核。
理解这一点,对于在Python中编写高效的并行程序至关重要。你只需要在脑子里做个简单判断,不必把 GIL 想得过于神秘。
“一行并行”的常见陷阱与注意事项
那天我和小李语音聊了十几分钟,顺便把几个常见的坑提前说了,你也值得留意:
-
函数必须可序列化(针对多进程)
- 使用
ProcessPoolExecutor 时,你传递给 map 的函数(func)必须是模块顶层定义的函数,不能是 lambda 匿名函数或嵌套在其他函数内部的函数。
- 否则,在跨进程传递时会因无法序列化(pickling)而直接报错。
-
谨慎对待全局变量(尤其多进程)
- 多线程间共享全局变量还相对容易(需注意线程安全),但多进程时,每个进程拥有独立的内存空间。在子进程中修改全局变量,主进程是无法感知的。
- 最佳实践是老老实实通过参数传入数据,再通过返回值传出结果,完全依赖
map 的输入输出。
-
任务太少别用池子
- 如果你只有3个任务,却开一个16个线程/进程的池子,创建和销毁池子、上下文切换的开销反而可能抵消并行带来的收益。
- 通常,我会用
max_workers = min(32, os.cpu_count() * 5) 这样的经验公式来设置工作线程/进程数,避免过度浪费资源。
-
如何添加进度条
with ThreadPoolExecutor(max_workers=8) as pool:
results = list(tqdm(pool.map(process_file, paths), total=len(paths)))
-
异常处理
- 在
map 中,如果某个任务抛出了异常,这个异常会在你尝试获取该任务结果时被抛出。
- 如果需要更精细的异常处理(例如记录哪个任务失败了但继续执行其他任务),可以使用
executor.submit() 配合 as_completed(),但那就不止一行代码了。
终极模板:一行并行的通用脚本
那天外卖到了,我走回家的路上,顺手给他丢了一个最简化、可复用的模板。他后来新写的“批量处理脚本”基本都照这个抄,你也可以收藏备用:
# 示例:CPU密集型任务模板 (cpu_bound.py)
from concurrent.futures import ProcessPoolExecutor
import time
from math import sqrt
def heavy(x: int) -> float:
# 模拟重计算任务
s = 0
for i in range(50_000):
s += sqrt((x + i) % 1000)
return s
if __name__ == "__main__":
tasks = list(range(16))
start = time.perf_counter()
with ProcessPoolExecutor() as pool:
results = list(pool.map(heavy, tasks)) # ← 一行实现多进程并行
print("cost:", time.perf_counter() - start)
print(results)
你要把它改成 I/O 密集型版本,只需做两处替换:
- 把
ProcessPoolExecutor 换成 ThreadPoolExecutor。
- 把
heavy 函数里那堆计算循环换成 time.sleep(0.5) 或 requests.get(url) 之类的 I/O 操作即可。
整篇文章说了这么多,其实就是想在你脑子里固化一个非常具体的思维模式:
以后但凡看到那种“for 循环处理一大堆相互独立小任务”的 Python 代码,就先在心里问一句:“能不能把它改成 Executor().map(func, tasks) 这一行?”
如果能改,那它大概率就具备了并行化的潜力。至于到底该选用线程池 (ThreadPoolExecutor) 还是进程池 (ProcessPoolExecutor),就看你那段核心逻辑主要是在“计算”(CPU bound),还是在“等待”(I/O bound)。
掌握这一招,你就能轻松应对许多常见的性能优化场景。这种将任务分解并映射到多个执行单元的思想,其根源也来自于算法设计中的分治策略,而多进程机制则深深依赖于操作系统提供的进程管理与隔离能力。
希望这个简单的技巧能帮你和你的脚本“跑”得更快。更多关于Python进阶和并发编程的深度讨论,欢迎来云栈社区交流分享。