在 Rust 异步生态中,我们可以使用 tokio::spawn 配合 channel 来处理并发任务。然而,当任务逻辑变得复杂时,这种指令式的写法往往会导致代码臃肿,且难以精确控制并发粒度和背压。今天我们将通过 StreamExt 提供的声明式并发原语实现一个多线程文件下载器,感受函数式组合子带来的威力。
异步组合子
异步组合子是作用于 Future 和 Stream 上的高阶函数。它以函数式的声明式编程方式实现 任务调度与业务逻辑的分离。这避免了业务逻辑代码的混乱,显著提高了可维护性。
- FutureExt:是对单一异步单元的增强。在我们调用
await 之前,可以通过 .then()、.map() 或 .inspect() 定义任务完成后的流水线操作。
- StreamExt:是对异步
Stream 的增强,是实现并发控制的核心,提供了如 .buffer_unordered()、.for_each_concurrent() 等方法。
与传统的显式循环相比,组合子利用 Rust 的所有权系统和状态机模型,在编译期就确定了任务的流转方式,消除了运行时的逻辑歧义。
代码实践:高并发分片下载器
引入依赖
FutureExt 和 StreamExt trait 都来自于 futures crate,而 indicatif crate 用来美化下载进度条显示。reqwest 是 HTTP 请求客户端。tokio 则是 Rust 生态异步运行时的事实标准。
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1", features = ["full"] }
futures = "0.3"
indicatif = "0.18"
main 函数
利用 StreamExt 中 buffer_unordered 函数来实现并发下载。通过 HTTP Range 头来请求文件的不同分片,最后并发写入磁盘。
use futures::StreamExt;
use indicatif::{ProgressBar, ProgressStyle};
use reqwest::header::{CONTENT_LENGTH, RANGE};
use std::sync::Arc;
use tokio::fs::OpenOptions;
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
const CONCURRENCY: usize = 8;
const CHUNK_SIZE: u64 = 1024 * 512;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let url = "https://releases.ubuntu.com/22.04/ubuntu-22.04.5-desktop-amd64.iso.zsync";
let file_path = "downloaded_file.zsync";
let client = reqwest::Client::new();
let response = client.head(url).send().await?;
let total_size = response
.headers()
.get(CONTENT_LENGTH)
.and_then(|ct| ct.to_str().ok()?.parse::<u64>().ok())
.expect("无法获取文件大小");
let pb = ProgressBar::new(total_size);
// {msg} 占位符用于在最后显示信息
// [elapsed_precise] 显示总耗时
pb.set_style(ProgressStyle::default_bar()
.template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({msg})")?
.progress_chars("#>-"));
let file = OpenOptions::new()
.write(true)
.create(true)
.open(file_path)
.await?;
file.set_len(total_size).await?;
let file_handle = Arc::new(tokio::sync::Mutex::new(file));
// 切分任务
let mut chunks = Vec::new();
let mut start = 0;
while start < total_size {
let end = std::cmp::min(start + CHUNK_SIZE - 1, total_size - 1);
chunks.push((start, end));
start += CHUNK_SIZE;
}
let pb_clone = pb.clone();
futures::stream::iter(chunks)
.map(|(start, end)| {
let client = client.clone();
let file_handle = Arc::clone(&file_handle);
let pb = pb_clone.clone();
async move {
let range = format!("bytes={}-{}", start, end);
let data = client
.get(url)
.header(RANGE, range)
.send()
.await?
.bytes()
.await?;
let chunk_len = data.len() as u64;
// 写入文件
let mut file = file_handle.lock().await;
file.seek(std::io::SeekFrom::Start(start)).await?;
file.write_all(&data).await?;
// 更新进度条
pb.inc(chunk_len);
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
}
})
.buffer_unordered(CONCURRENCY)
.for_each(|res| async move {
if let Err(e) = res {
eprintln!("\n分片下载出错: {}", e);
}
})
.await;
// 设置下载完成消息提示
pb.set_message("下载成功!");
println!("\n✨ 所有分片已合并!文件保存至: {}", file_path);
Ok(())
}
CONCURRENCY: 并发数设置,通过 buffer_unordered 控制背压,现在的程序是 8 个连接同时下载
CHUNK_SIZE: 每个分片 512KB
client.head(): 注意⚠️这里是构建的 head 请求来先获取文件的总大小
ProgressBar::new: 使用 Indicatif 初始化进度条
file_handle...: 预创建文件并使用 Arc 包裹文件句柄,以便在多个异步任务间共享
pb.clone(): ProgressBar 内部也是使用 Arc,克隆是轻量级的
futures::stream::iter(chunks): 构建并发下载流
async move {...}: 为每个分片创建一个下载 Future 用来处理耗时的网络请求和返回。而在写入文件的时候是先获取锁再写入的。每一个任务写入完成后更新进度条
运行效果
终端输出如下,项目根目录下会生成 downloaded_file.zsync 文件
⠒ [00:00:05] [##########################################################################################################################################################################] 9.98 MiB/9.98 MiB (下载成功!)
✨ 所有分片已合并!文件保存至: downloaded_file.zsync
在终端显示中进度条是绿色的
buffer_unordered 原理
调度机制
当流被拉取时,buffer_unordered 会尝试从上游获取 Future 并将其放入内部的“就绪缓冲区”。
- 维持最多
n 个 Future 同时处于等待状态。
- 非阻塞产出:一旦其中任何一个分片下载完成,该组合子立即产出结果,并从上游拉取下一个分片补充。这避免了因为某个慢速连接而阻塞其他已完成分片的处理逻辑。
背压支持
如果下游的 for_each 逻辑(如复杂的解密或磁盘写入)由于某种原因变慢,buffer_unordered 会停止从上游迭代器拉取新的分片任务。这种拉取驱动的机制保护了系统资源,不会因过度创建网络连接而导致内存溢出。
原子性写入与并发锁
虽然我们使用了 Arc<Mutex<File>>,但在 I/O 密集型任务中,Mutex 锁竞争的开销被忽略不计,因为长久的耗时是发生在网络请求中。
- 网络下载是耗时最长部分,是在锁之外并发执行的。
- 锁内逻辑仅包含
seek 和 write。 对于操作系统的文件系统缓存而言,这种串行写入开销极低,而网络 I/O 的并行收益远大于锁的开销。
总结
至此,我们使用 StreamExt 异步组合子的特性再配合 Rust 的所有权系统优雅地实现了可预测的多线程并发。这种声明式编程范式将任务调度与业务逻辑清晰地解耦,使代码更简洁、更易维护,同时也减少了潜在的并发错误。相较于传统的指令式并发控制,这种方式确实省心省力。
希望这篇实战教程能帮助你更好地理解 Rust 异步编程的魅力。欢迎在 云栈社区 分享你的实践心得或提出疑问。
Happy Coding with Rust🦀!