找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2198

积分

0

好友

316

主题
发表于 前天 02:57 | 查看: 13| 回复: 0

那天晚上下班挺晚的,大概十一点多,我在公司楼下等外卖。手机那边,我们组小李还在吐槽:“哥,我这个脚本要跑一个小时,老板说让‘搞搞并行’,你说这玩意儿不是一句for就完了吗,还并行啥啊?”

我就让他把代码甩过来看了一眼,典型的“一个 for 把一堆活全干了”的那种:

def process_file(path: str) -> dict:
    # 模拟处理一下
    import time, hashlib
    time.sleep(0.5)
    return {
        "path": path,
        "md5": hashlib.md5(path.encode()).hexdigest(),
    }

paths = [f"file_{i}.txt" for i in range(20)]

results = [process_file(p) for p in paths]
print(results)

20 个文件,每个模拟处理耗时 0.5 秒,串行执行就是 10 秒往上,这还只是测试环境。如果真在服务器上扫描处理几千个日志文件,可能人都下班了脚本还没跑完。

我当时就跟他说,其实你这活儿,改“一行”代码基本就能让程序在多核上飞起来。当然,这一行前后需要搭建点简单的骨架,否则 Python 不知道你想干嘛。

并行加速的原理:CPU密集与I/O密集

在动手之前,我们得先理清一个关键概念,以免后续思路打架:

  • CPU密集型任务:如果你的函数计算量非常大,例如压缩图片、进行大量数学运算,这类任务主要消耗CPU资源。
  • I/O密集型任务:如果你的函数大部分时间在等待,例如读取文件、查询数据库、发起HTTP网络请求,这类任务主要耗时在输入/输出上。

这两类任务在 Python 中实现并行的路径是不同的:

  • I/O密集型:使用多线程就能获得显著加速。因为线程在等待 I/O 操作(如磁盘读写、网络响应)时,Python 的 GIL(全局解释器锁) 会被释放,其他线程可以趁机执行。
  • CPU密集型:必须使用多进程,才能绕过 GIL 的限制,真正利用多个CPU核心进行计算。

所以,我当时先问了小李一句:“你这个 process_file 主要是在干嘛?读磁盘加 sleep 模拟等待?”他说差不多,主要是读一堆日志文件,里面还夹杂着一些 HTTP 请求。那这就好办了,属于典型的I/O密集型,用多线程一行代码就能搞定。

一行实现线程池并行:把 for 换成 map

我直接把他那段代码改成了下面这样(核心真的就一行,稍后解释):

from concurrent.futures import ThreadPoolExecutor

def process_file(path: str) -> dict:
    import time, hashlib
    time.sleep(0.5)
    return {
        "path": path,
        "md5": hashlib.md5(path.encode()).hexdigest(),
    }

paths = [f"file_{i}.txt" for i in range(20)]

with ThreadPoolExecutor(max_workers=8) as pool:
    results = list(pool.map(process_file, paths))  # ← 关键的一行并行

print(results)

你看,原来的串行逻辑是:

results = [process_file(p) for p in paths]

现在变成了:

results = list(pool.map(process_file, paths))

核心逻辑没变:依然是“对每个 path 调用一次 process_file 函数,再把所有结果收集成一个列表”。但区别在于,现在是线程池里的 8 个线程(max_workers=8)在协同工作,而不是主线程一个人顺序干 20 次。

如果你有“代码行数强迫症”,非要追求物理上的“一行版本”,在脚本里这么写也是合法的(尽管可读性会变差,不推荐实际使用):

from concurrent.futures import ThreadPoolExecutor; results = list(ThreadPoolExecutor(max_workers=8).map(process_file, paths))

这确实实现了“一行 Python 代码完成并行”,但一般不建议这样写,否则日后同事 Review 代码时可能会想找你“聊聊”。

性能对比:效果立竿见影

我当时一边等外卖一边敲了个简单的对比 Demo 发给他。你也可以在自己的电脑上跑一下,感受速度差异:

import time
from concurrent.futures import ThreadPoolExecutor

def work(x: int) -> int:
    time.sleep(0.5)  # 模拟 I/O 等待
    return x * x

numbers = list(range(20))

# 串行版本
start = time.perf_counter()
serial = [work(n) for n in numbers]
print("serial cost:", time.perf_counter() - start)

# 并行版本(关键一行)
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=8) as pool:
    parallel = list(pool.map(work, numbers))
print("parallel cost:", time.perf_counter() - start)

print(serial == parallel)

你大概能看到类似这样的输出(关注量级即可):

  • 串行耗时:接近 10 秒(20个任务 * 0.5秒)
  • 并行耗时:大约 1.5 ~ 2 秒

并且验证结果 serial == parallel 返回 True,说明并行处理并没有改变最终结果的正确性,但时间直接缩短了好几倍。

关键的速度提升就来自那一行:

parallel = list(pool.map(work, numbers))

我跟小李说,你只需要记住一个核心心法:

“以后看到用 for 循环处理一堆相互独立小任务的地方,就先想想能不能把它改成 pool.map(func, tasks)。”

至于线程池、上下文管理器 (with),那些都是外围的标准包装。

CPU密集型任务怎么办?换进程池!

说完I/O密集型,我想起前两天还有个兄弟在做图片批量缩放,用的是 Pillow 库。他直接写了个 for 循环,导致一个CPU核心满载,其他核心在“摸鱼”。那种场景就不能再用线程池了,必须换成进程池。

我直接给了他一个改造后的版本:

from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
from PIL import Image

def resize_image(path: str) -> str:
    img = Image.open(path)
    img = img.resize((256, 256))
    out = Path("output") / Path(path).name
    out.parent.mkdir(parents=True, exist_ok=True)
    img.save(out)
    return str(out)

if __name__ == "__main__":  # 注意这个!多进程编程必须要有
    images = [str(p) for p in Path("images").glob("*.jpg")]

    with ProcessPoolExecutor() as pool:
        results = list(pool.map(resize_image, images))  # 这里一行实现多进程并行
    print(results)

这个版本想强调的是:

  1. ThreadPoolExecutor 换成 ProcessPoolExecutor
  2. 入口必须加上 if __name__ == "__main__":,尤其是在 Windows 系统上,这是为了避免子进程无限递归创建,导致程序崩溃。
  3. 本质依然是:一个 map 把原来的 for 循环变成了“多个进程一起跑”

其背后的思想,与高并发系统设计中的任务分发与调度有异曲同工之妙。核心思路不变:把“要处理的一堆任务”变成一个可迭代对象,然后用一行 Executor().map(func, iterable) 抛给池子去并行处理。

关键概念:GIL 到底要不要管?

我知道很多人脑子里会立刻蹦出“GIL”这个词。小李当时也是,上来就说:“Python 有 GIL(全局解释器锁),多线程不就没用了吗?”

我就跟他简单解释了一下:

  • GIL 会限制纯 Python 代码在同一时刻只能被一个线程执行。
  • 但是,当线程在执行I/O操作(如网络请求 requests.get()、文件读取 .read())或调用某些底层 C 库函数时,GIL 会被释放,此时其他线程就可以获得执行权。

所以结论很清晰:

  • 如果你的任务大部分时间在等待 I/Otime.sleep, requests.get, 文件 read/write),使用 ThreadPoolExecutor(线程池)效果会非常好。
  • 如果你的任务大部分时间在进行纯 Python 计算(数学循环、加解密),就必须使用 ProcessPoolExecutor(进程池)才能有效利用多核。

理解这一点,对于在Python中编写高效的并行程序至关重要。你只需要在脑子里做个简单判断,不必把 GIL 想得过于神秘。

“一行并行”的常见陷阱与注意事项

那天我和小李语音聊了十几分钟,顺便把几个常见的坑提前说了,你也值得留意:

  1. 函数必须可序列化(针对多进程)

    • 使用 ProcessPoolExecutor 时,你传递给 map 的函数(func)必须是模块顶层定义的函数,不能是 lambda 匿名函数或嵌套在其他函数内部的函数。
    • 否则,在跨进程传递时会因无法序列化(pickling)而直接报错。
  2. 谨慎对待全局变量(尤其多进程)

    • 多线程间共享全局变量还相对容易(需注意线程安全),但多进程时,每个进程拥有独立的内存空间。在子进程中修改全局变量,主进程是无法感知的。
    • 最佳实践是老老实实通过参数传入数据,再通过返回值传出结果,完全依赖 map 的输入输出。
  3. 任务太少别用池子

    • 如果你只有3个任务,却开一个16个线程/进程的池子,创建和销毁池子、上下文切换的开销反而可能抵消并行带来的收益。
    • 通常,我会用 max_workers = min(32, os.cpu_count() * 5) 这样的经验公式来设置工作线程/进程数,避免过度浪费资源。
  4. 如何添加进度条

    • 如果你希望实时看到任务执行进度,可以配合 tqdm 库。这也是典型的一行并行加一行进度条,体验很好。
      
      from concurrent.futures import ThreadPoolExecutor
      from tqdm import tqdm

    with ThreadPoolExecutor(max_workers=8) as pool:
    results = list(tqdm(pool.map(process_file, paths), total=len(paths)))

  5. 异常处理

    • map 中,如果某个任务抛出了异常,这个异常会在你尝试获取该任务结果时被抛出。
    • 如果需要更精细的异常处理(例如记录哪个任务失败了但继续执行其他任务),可以使用 executor.submit() 配合 as_completed(),但那就不止一行代码了。

终极模板:一行并行的通用脚本

那天外卖到了,我走回家的路上,顺手给他丢了一个最简化、可复用的模板。他后来新写的“批量处理脚本”基本都照这个抄,你也可以收藏备用:

# 示例:CPU密集型任务模板 (cpu_bound.py)
from concurrent.futures import ProcessPoolExecutor
import time
from math import sqrt

def heavy(x: int) -> float:
    # 模拟重计算任务
    s = 0
    for i in range(50_000):
        s += sqrt((x + i) % 1000)
    return s

if __name__ == "__main__":
    tasks = list(range(16))

    start = time.perf_counter()
    with ProcessPoolExecutor() as pool:
        results = list(pool.map(heavy, tasks))  # ← 一行实现多进程并行
    print("cost:", time.perf_counter() - start)
    print(results)

你要把它改成 I/O 密集型版本,只需做两处替换:

  1. ProcessPoolExecutor 换成 ThreadPoolExecutor
  2. heavy 函数里那堆计算循环换成 time.sleep(0.5)requests.get(url) 之类的 I/O 操作即可。

整篇文章说了这么多,其实就是想在你脑子里固化一个非常具体的思维模式:

以后但凡看到那种“for 循环处理一大堆相互独立小任务”的 Python 代码,就先在心里问一句:“能不能把它改成 Executor().map(func, tasks) 这一行?”

如果能改,那它大概率就具备了并行化的潜力。至于到底该选用线程池 (ThreadPoolExecutor) 还是进程池 (ProcessPoolExecutor),就看你那段核心逻辑主要是在“计算”(CPU bound),还是在“等待”(I/O bound)。

掌握这一招,你就能轻松应对许多常见的性能优化场景。这种将任务分解并映射到多个执行单元的思想,其根源也来自于算法设计中的分治策略,而多进程机制则深深依赖于操作系统提供的进程管理与隔离能力。

希望这个简单的技巧能帮你和你的脚本“跑”得更快。更多关于Python进阶和并发编程的深度讨论,欢迎来云栈社区交流分享。




上一篇:从某节10-10工作制跑路,到二叉树“最大层内元素和”题解
下一篇:Spring Boot集成Elasticsearch实战:从核心概念到复杂搜索功能实现
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-14 18:54 , Processed in 0.491917 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表