找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

377

积分

0

好友

51

主题
发表于 2025-11-27 14:48:51 | 查看: 24| 回复: 0

在编写Python程序时,您可能遇到过处理大数据耗时过长、爬取网页只能顺序执行或需要同时处理多个任务却不知如何实现的情况。这些都可以通过并发编程来解决。本文通过通俗比喻和极简代码,带您掌握Python中多进程、多线程和多协程三大并发工具,帮助程序运行效率显著提升。

一、先搞懂:什么是“并发”?

简单来说,并发就是让程序“同时处理多个任务”,类似于一边烹饪一边听音乐,无需等待一个任务完成再开始另一个。

Python中实现并发有三种常用方式,通过生活化比喻便于理解:

  • 多进程:如同开设多家独立工厂,每家工厂各自运作,互不干扰(充分利用多核CPU)
  • 多线程:好比一家工厂内的多位厨师,共享厨房设备,轮流执行任务(适合频繁等待的场景)
  • 多协程:类似一位灵活的服务员,同时照看多位顾客,在顾客等待时切换服务(资源占用极低,效率超高)

核心区别对比如下:

模式 核心比喻 适用场景 资源占用
多进程 多家独立工厂 持续计算任务(CPU密集型)
多线程 工厂内的多位厨师 频繁等待任务(I/O密集型)
多协程 灵活的服务员 高并发等待任务(万级请求) 极低

二、新手必学:三种并发模式(附可运行代码)

1. 多进程:适合持续计算任务(CPU密集型)

适用于数据分析、图像处理或复杂计算等场景,能充分利用多核CPU,突破Python的GIL(全局解释器锁)限制。

核心原理:每个进程拥有独立的Python解释器和内存空间,实现真正的并行计算。

极简代码示例(计算大数平方和)

import multiprocessing
import time

def calculate_square_sum(n, process_name):
    print(f"[{time.strftime('%X')}] {process_name} 开始计算1-{n}的平方和")
    result = sum(i*i for i in range(1, n+1))  # 纯CPU计算
    print(f"[{time.strftime('%X')}] {process_name} 计算完成")
    return result

if __name__ == "__main__":
    start_time = time.time()

    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
        tasks = [
            (10000000, "进程1"),  # 1-1000万
            (20000000, "进程2"),  # 1-2000万
            (30000000, "进程3"),  # 1-3000万
            (40000000, "进程4")   # 1-4000万
        ]
        results = pool.starmap(calculate_square_sum, tasks)

    total_time = time.time() - start_time
    print(f"\n总耗时:{total_time:.2f}秒")
    print(f"各任务结果:{results}")

运行效果(4核CPU):四个进程同时启动,总耗时约等于最慢任务的时间(约12秒),相比串行执行(约48秒)效率提升3-4倍。

注意事项

  • 进程数不宜超过CPU核心数,避免切换开销
  • 进程间内存独立,需使用Queue或Pipe进行数据传递
2. 多线程:适合频繁等待任务(I/O密集型)

适用于网页爬取、文件读写或API调用等场景,能在等待网络响应或磁盘操作时切换到其他任务。

核心原理:多个线程共享进程内存,一个线程等待时其他线程可继续执行,但受GIL限制不适合纯计算。

极简代码示例(模拟网页下载)

import time
from concurrent.futures import ThreadPoolExecutor
import requests  # 需安装:pip install requests

def download_url(url, thread_name):
    print(f"[{time.strftime('%X')}] {thread_name} 开始下载:{url}")
    time.sleep(2)  # 模拟网络I/O等待
    print(f"[{time.strftime('%X')}] {thread_name} 下载完成")
    return f"{url} 下载成功"

if __name__ == "__main__":
    start_time = time.time()

    with ThreadPoolExecutor(max_workers=5) as executor:
        urls = [
            "https://example.com/1", "https://example.com/2",
            "https://example.com/3", "https://example.com/4",
            "https://example.com/5"
        ]
        results = list(executor.map(download_url, urls, [f"线程{i+1}" for i in range(5)]))

    total_time = time.time() - start_time
    print(f"\n总耗时:{total_time:.2f}秒")
    print(f"结果:{results}")

运行效果:五个线程同时启动,总耗时约2秒(单个任务等待时间),相比串行执行(10秒)效率提升5倍。

注意事项

  • 避免用于纯计算任务,GIL会导致性能不佳
  • 线程数控制在10-100之间,过多会增加切换开销
  • 修改共享变量时需使用threading.Lock()防止数据混乱
3. 多协程:适合高并发任务(万级请求)

适用于Web服务器或大规模爬虫场景,一个线程可处理成千上万个协程,资源占用极低。

核心原理:协程为用户态轻量级线程,由Python自行调度,切换成本远低于操作系统线程。

极简代码示例(高并发I/O任务)

import asyncio
import time

async def async_task(task_id, delay):
    print(f"[{time.strftime('%X')}] 协程{task_id} 开始,等待{delay}秒")
    await asyncio.sleep(delay)  # 必须使用异步等待
    print(f"[{time.strftime('%X')}] 协程{task_id} 完成")
    return f"任务{task_id}成功"

async def main():
    start_time = time.time()

    tasks = [
        async_task(1, 2), async_task(2, 1), async_task(3, 3),
        async_task(4, 1), async_task(5, 2), async_task(6, 3),
        async_task(7, 1), async_task(8, 2), async_task(9, 3),
        async_task(10, 2)
    ]

    results = await asyncio.gather(*tasks)

    total_time = time.time() - start_time
    print(f"\n总耗时:{total_time:.2f}秒")
    print(f"所有结果:{results}")

if __name__ == "__main__":
    asyncio.run(main())

运行效果:十个协程同时启动,总耗时约3秒(最慢任务等待时间),相比串行执行(20秒)效率显著提升。

注意事项

  • 协程函数需用async def定义,调用时加await
  • 使用异步库(如aiohttp)替代同步库(如requests)
  • 不适合纯计算任务,需结合多进程使用

三、新手进阶:混合模式(多进程+多协程)

针对同时包含计算和I/O的任务(如“下载数据→计算分析→保存结果”),可采用多进程与多协程混合模式,兼顾多核利用和高并发I/O。

极简代码示例

import multiprocessing
import asyncio
import time

async def async_worker(task_id, data, process_name):
    print(f"[{time.strftime('%X')}] {process_name}-协程{task_id} 开始处理")
    await asyncio.sleep(1)  # 模拟I/O等待
    result = sum(i*i for i in data)  # 模拟计算
    print(f"[{time.strftime('%X')}] {process_name}-协程{task_id} 完成")
    return result

def process_worker(data_chunk, process_name):
    return asyncio.run(
        asyncio.gather(
            async_worker(1, data_chunk, process_name),
            async_worker(2, data_chunk, process_name)
        )
    )

if __name__ == "__main__":
    start_time = time.time()

    data_chunks = [
        list(range(1, 1001)), list(range(1001, 2001)),
        list(range(2001, 3001)), list(range(3001, 4001))
    ]

    with multiprocessing.Pool(processes=2) as pool:
        results = pool.starmap(process_worker, 
                              [(data_chunks[0], "进程1"), (data_chunks[1], "进程1"),
                               (data_chunks[2], "进程2"), (data_chunks[3], "进程2")])

    total_time = time.time() - start_time
    print(f"\n总耗时:{total_time:.2f}秒")
    print(f"最终结果:{sum(sum(r) for r in results)}")

四、新手选择指南:适用场景总结

  1. 纯计算任务(数据分析、图像处理)→ 多进程
  2. 普通I/O任务(几十至几百个网页爬取)→ 多线程
  3. 高并发I/O任务(万级API请求、Web服务器)→ 多协程
  4. 混合任务(下载+分析+保存)→ 多进程+多协程

五、核心知识点备忘

  1. GIL(全局解释器锁):限制多线程并行执行纯Python代码,纯计算需用多进程
  2. 异步库选择:协程必须使用异步库(aiohttp、asyncio),避免阻塞线程
  3. 资源对比:多进程 > 多线程 > 多协程(资源占用),多协程 > 多线程 > 多进程(并发能力)
  4. 选用原则:优先使用多线程解决I/O任务,多协程处理高并发,多进程应对计算密集型任务

小结:入门三步法

  1. 任务分析:判断任务属计算密集型还是I/O密集型
  2. 工具选择:计算用进程,I/O用线程,高并发用协程
  3. 渐进实践:从简单代码入手,逐步添加异常处理和结果汇总

掌握Python并发编程核心方法后,可显著提升程序执行效率,避免资源闲置。




上一篇:用户态线程多核限制深度解析:Java线程模型演进与Loom虚拟线程实战
下一篇:TodePond / GulfOfMexico:一个用反常规设计探索编程语言边界的开源项目
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区(YunPan.Plus) ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-7 05:45 , Processed in 0.090311 second(s), 36 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 CloudStack.

快速回复 返回顶部 返回列表