找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2565

积分

0

好友

358

主题
发表于 13 小时前 | 查看: 0| 回复: 0

在 Rust 异步生态中,我们可以使用 tokio::spawn 配合 channel 来处理并发任务。然而,当任务逻辑变得复杂时,这种指令式的写法往往会导致代码臃肿,且难以精确控制并发粒度和背压。今天我们将通过 StreamExt 提供的声明式并发原语实现一个多线程文件下载器,感受函数式组合子带来的威力。

异步组合子

异步组合子是作用于 FutureStream 上的高阶函数。它以函数式的声明式编程方式实现 任务调度与业务逻辑的分离。这避免了业务逻辑代码的混乱,显著提高了可维护性。

  • FutureExt:是对单一异步单元的增强。在我们调用 await 之前,可以通过 .then().map().inspect() 定义任务完成后的流水线操作。
  • StreamExt:是对异步 Stream 的增强,是实现并发控制的核心,提供了如 .buffer_unordered().for_each_concurrent() 等方法。

与传统的显式循环相比,组合子利用 Rust 的所有权系统和状态机模型,在编译期就确定了任务的流转方式,消除了运行时的逻辑歧义。

代码实践:高并发分片下载器

引入依赖

FutureExtStreamExt trait 都来自于 futures crate,而 indicatif crate 用来美化下载进度条显示。reqwest 是 HTTP 请求客户端。tokio 则是 Rust 生态异步运行时的事实标准。

reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1", features = ["full"] }
futures = "0.3"
indicatif = "0.18"

main 函数

利用 StreamExtbuffer_unordered 函数来实现并发下载。通过 HTTP Range 头来请求文件的不同分片,最后并发写入磁盘。

use futures::StreamExt;
use indicatif::{ProgressBar, ProgressStyle};
use reqwest::header::{CONTENT_LENGTH, RANGE};
use std::sync::Arc;
use tokio::fs::OpenOptions;
use tokio::io::{AsyncSeekExt, AsyncWriteExt};

const CONCURRENCY: usize = 8;
const CHUNK_SIZE: u64 = 1024 * 512;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let url = "https://releases.ubuntu.com/22.04/ubuntu-22.04.5-desktop-amd64.iso.zsync";
    let file_path = "downloaded_file.zsync";
    let client = reqwest::Client::new();

    let response = client.head(url).send().await?;
    let total_size = response
        .headers()
        .get(CONTENT_LENGTH)
        .and_then(|ct| ct.to_str().ok()?.parse::<u64>().ok())
        .expect("无法获取文件大小");

    let pb = ProgressBar::new(total_size);
    // {msg} 占位符用于在最后显示信息
    // [elapsed_precise] 显示总耗时
    pb.set_style(ProgressStyle::default_bar()
        .template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({msg})")?
        .progress_chars("#>-"));

    let file = OpenOptions::new()
        .write(true)
        .create(true)
        .open(file_path)
        .await?;
    file.set_len(total_size).await?;
    let file_handle = Arc::new(tokio::sync::Mutex::new(file));

    // 切分任务
    let mut chunks = Vec::new();
    let mut start = 0;
    while start < total_size {
        let end = std::cmp::min(start + CHUNK_SIZE - 1, total_size - 1);
        chunks.push((start, end));
        start += CHUNK_SIZE;
    }

    let pb_clone = pb.clone();

    futures::stream::iter(chunks)
        .map(|(start, end)| {
            let client = client.clone();
            let file_handle = Arc::clone(&file_handle);
            let pb = pb_clone.clone();

            async move {
                let range = format!("bytes={}-{}", start, end);
                let data = client
                    .get(url)
                    .header(RANGE, range)
                    .send()
                    .await?
                    .bytes()
                    .await?;

                let chunk_len = data.len() as u64;

                // 写入文件
                let mut file = file_handle.lock().await;
                file.seek(std::io::SeekFrom::Start(start)).await?;
                file.write_all(&data).await?;

                // 更新进度条
                pb.inc(chunk_len);

                Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
            }
        })
        .buffer_unordered(CONCURRENCY)
        .for_each(|res| async move {
            if let Err(e) = res {
                eprintln!("\n分片下载出错: {}", e);
            }
        })
        .await;

    // 设置下载完成消息提示
    pb.set_message("下载成功!");
    println!("\n✨ 所有分片已合并!文件保存至: {}", file_path);
    Ok(())
}

CONCURRENCY: 并发数设置,通过 buffer_unordered 控制背压,现在的程序是 8 个连接同时下载
CHUNK_SIZE: 每个分片 512KB
client.head(): 注意⚠️这里是构建的 head 请求来先获取文件的总大小
ProgressBar::new: 使用 Indicatif 初始化进度条
file_handle...: 预创建文件并使用 Arc 包裹文件句柄,以便在多个异步任务间共享
pb.clone(): ProgressBar 内部也是使用 Arc,克隆是轻量级的
futures::stream::iter(chunks): 构建并发下载流
async move {...}: 为每个分片创建一个下载 Future 用来处理耗时的网络请求和返回。而在写入文件的时候是先获取锁再写入的。每一个任务写入完成后更新进度条

运行效果

终端输出如下,项目根目录下会生成 downloaded_file.zsync 文件

⠒ [00:00:05] [##########################################################################################################################################################################] 9.98 MiB/9.98 MiB (下载成功!)
✨ 所有分片已合并!文件保存至: downloaded_file.zsync

在终端显示中进度条是绿色的

buffer_unordered 原理

调度机制

当流被拉取时,buffer_unordered 会尝试从上游获取 Future 并将其放入内部的“就绪缓冲区”。

  • 维持最多 n 个 Future 同时处于等待状态。
  • 非阻塞产出:一旦其中任何一个分片下载完成,该组合子立即产出结果,并从上游拉取下一个分片补充。这避免了因为某个慢速连接而阻塞其他已完成分片的处理逻辑。

背压支持

如果下游的 for_each 逻辑(如复杂的解密或磁盘写入)由于某种原因变慢,buffer_unordered 会停止从上游迭代器拉取新的分片任务。这种拉取驱动的机制保护了系统资源,不会因过度创建网络连接而导致内存溢出。

原子性写入与并发锁

虽然我们使用了 Arc<Mutex<File>>,但在 I/O 密集型任务中,Mutex 锁竞争的开销被忽略不计,因为长久的耗时是发生在网络请求中。

  1. 网络下载是耗时最长部分,是在锁之外并发执行的。
  2. 锁内逻辑仅包含 seekwrite。 对于操作系统的文件系统缓存而言,这种串行写入开销极低,而网络 I/O 的并行收益远大于锁的开销。

总结

至此,我们使用 StreamExt 异步组合子的特性再配合 Rust 的所有权系统优雅地实现了可预测的多线程并发。这种声明式编程范式将任务调度与业务逻辑清晰地解耦,使代码更简洁、更易维护,同时也减少了潜在的并发错误。相较于传统的指令式并发控制,这种方式确实省心省力。

希望这篇实战教程能帮助你更好地理解 Rust 异步编程的魅力。欢迎在 云栈社区 分享你的实践心得或提出疑问。

Happy Coding with Rust🦀!




上一篇:手把手教你用OpenCode CLI进行AI编程:快速生成登录注册页面
下一篇:RustFS高性能分布式存储实战:兼容S3、国产化与性能对比分析
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-25 18:20 , Processed in 0.312341 second(s), 42 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表