找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3215

积分

0

好友

453

主题
发表于 昨天 07:23 | 查看: 3| 回复: 0

用PostgreSQL替代BullMQ技术宣言图

上个月,我帮一家创业公司调试他们的“任务队列”。结果发现,那套复杂的系统,本质上只是在用 Redis 每月烧掉 200 美元,来处理每天区区 47 封邮件。

我问他们,当初为什么选择 Bull + Redis 这套方案。他们的回答很有意思:“我们看到 Uber 用 Kafka。”

这种事其实屡见不鲜。一个用户量可能才 500 的产品,就开始设计 Redis 集群、琢磨任务优先级、排查连接池问题,仿佛默认自己明天就会变成一家“日处理几十亿事件”的巨头公司。

但真正的成本,往往不是服务器账单,而是 认知负担。每一个新加入的开发者,都需要先理解你那套复杂的队列拓扑;每一次部署,都要祈祷 Redis 服务正常;每一次排查线上 Bug,都不得不在多个系统中进行分布式追踪。

很多时候,我们是在用 未来或许会遇到的问题,来交换 眼下实实在在的复杂性。而残酷的现实是,你现在可能根本不需要去解决那个“未来”的问题。

真正有效的做法:用数据库当队列,直到它真的撑不住

别急着上 Redis,也别急着引入 Bull。先用你手头就有的 数据库

你的数据库,本来就能当队列

PostgreSQL 为例,它每天处理 10 万个以上 的异步任务通常毫无压力。关键它已经在你系统中运行了,有成熟的备份方案,团队里的每个人都熟悉它。

它的失败模式简单,调试工具也足够成熟。很多时候,你并不需要一个专门的队列服务。你需要的,仅仅是一张 jobs 表,再加一个 worker 进程。

就这么简单。

实现方式

Step 1:创建 jobs 表

CREATE TABLE jobs (
  id SERIAL PRIMARY KEY,
  job_type VARCHAR(100) NOT NULL,
  payload JSONB NOT NULL,
  status VARCHAR(20) DEFAULT 'pending',
  attempts INT DEFAULT 0,
  max_attempts INT DEFAULT 3,
  scheduled_at TIMESTAMP DEFAULT NOW(),
  created_at TIMESTAMP DEFAULT NOW(),
  completed_at TIMESTAMP,
  error TEXT
);

CREATE INDEX idx_jobs_pending ON jobs(status, scheduled_at)
  WHERE status = 'pending';

这张简单的表已经覆盖了队列的核心需求:重试延迟执行错误记录。这基本上就是 Bull 这类库提供的一整套功能。

最大的区别在于,当你想查看失败任务时,可以直接运行 SELECT * FROM jobs WHERE status = 'failed',直观且无需额外工具。

Step 2:写 worker

下面是一个用 Node.js 实现的 Worker 示例,核心逻辑大约 60 行。

const { Pool } = require('pg');

const pool = new Pool({
  connectionString: process.env.DATABASE_URL
});

async function claimJob(client) {
  const result = await client.query(`
    UPDATE jobs
    SET status = 'processing'
    WHERE id = (
      SELECT id FROM jobs
      WHERE status = 'pending' 
        AND scheduled_at <= NOW()
      ORDER BY scheduled_at
      LIMIT 1
      FOR UPDATE SKIP LOCKED
    )
    RETURNING id, job_type, payload
  `);

  return result.rows[0];
}

async function processJob(job) {
  switch (job.job_type) {
    case 'send_email':
      await sendEmail(job.payload.to, job.payload.subject);
      break;
    case 'process_upload':
      await processUpload(job.payload.file_id);
      break;
  }
}

async function markComplete(client, jobId) {
  await client.query(
    `UPDATE jobs 
     SET status = 'completed', completed_at = NOW()
     WHERE id = $1`,
    [jobId]
  );
}

async function markFailed(client, jobId, error) {
  await client.query(
    `UPDATE jobs 
     SET attempts = attempts + 1,
         status = CASE 
           WHEN attempts + 1 >= max_attempts THEN 'failed'
           ELSE 'pending'
         END,
         scheduled_at = CASE
           WHEN attempts + 1 < max_attempts 
           THEN NOW() + (POW(2, attempts) * INTERVAL '1 minute')
           ELSE scheduled_at
         END,
         error = $1
     WHERE id = $2`,
    [error, jobId]
  );
}

async function runWorker() {
  while (true) {
    const client = await pool.connect();

    try {
      const job = await claimJob(client);

      if (!job) {
        client.release();
        await new Promise(resolve => setTimeout(resolve, 1000));
        continue;
      }

      try {
        await processJob(job);
        await markComplete(client, job.id);
      } catch (error) {
        await markFailed(client, job.id, error.message);
        console.error(`Job ${job.id} failed:`, error);
      }

      client.release();
    } catch (error) {
      client.release();
      console.error('Worker error:', error);
      await new Promise(resolve => setTimeout(resolve, 5000));
    }
  }
}

runWorker();

核心只有 60 行左右。没有外部队列服务,没有复杂的 Redis 配置,也不需要额外的监控面板。

Step 3:在 API 里入队

在业务接口中,插入一条记录就相当于入队。

async function uploadDocument(req, res) {
  const fileId = await saveFile(req.file);

  await pool.query(
    `INSERT INTO jobs (job_type, payload)
     VALUES ($1, $2)`,
    ['process_upload', JSON.stringify({ file_id: fileId })]
  );

  res.json({ status: 'processing', file_id: fileId });
}

当出现问题时,你直接查询 jobs 表。想要重试某个任务?直接更新它的状态字段。凌晨两点排查问题,你用的是最熟悉的 SQL 语句,而不是去努力回忆某个队列库特有的重试语义和晦涩日志。

你到底避免了什么?

1. 不要用 Redis 做持久化队列
Redis 是缓存领域的王者,但作为持久化队列,它并不理想。如果 Worker 进程崩溃,正在处理的任务很可能会丢失。当然你可以开启 AOF 和 RDB 持久化,但随之而来的是对快照频率、数据丢失窗口的额外担忧。这是在为“未来也许有的规模”,提前引入真实且确定的运维风险。

2. 不要在没有证据前就引入 Bull 这类库
引入这些库意味着:新的依赖、新的配置项、新的一层抽象。而上文的 60 行代码,是你和你的团队 完全能够读懂、掌控和修改 的逻辑。

3. 不要为了 Jobs 单独搞一个微服务
就让 Worker 进程跑在应用同一台或同一组机器上。用 PM2、Docker Compose 或者 systemd 来管理它。每一个新拆分的服务,都意味着一套新的部署流程、一套新的监控指标和一套新的故障模式。在初期,这带来的复杂度远超其收益。

4. 不要“以防万一”而过度设计
你可能永远也用不到那些复杂功能:精细的任务优先级、复杂的重试策略(如指数退避加抖动)、任务链(job chaining)等。等到你的业务真的需要它们时,你会有非常明确的需求和数据支撑,那时再升级也不迟。

什么时候这个方案真的不行了?

当然,这个方案并非银弹。它可能在以下场景中遇到瓶颈:

  • 吞吐量:需要每秒处理上万个任务时,数据库的轮询开销会变得显著。
  • 延迟:要求任务必须被毫秒级拾取并执行的场景。
  • 已有基础设施:你的系统中已经稳定运行着 Redis 或 RabbitMQ 等消息中间件。
  • 高级模式:需要多消费者组、精确一次(exactly-once)投递、复杂的发布/订阅(fanout)等高级特性。

但重要的是,当你真的走到那一步时,你将拥有:

  • 明确的使用数据和性能指标。
  • 清晰的任务模型和业务需求。
  • 足够的工程和运维能力。

那时,再引入一个真正的消息队列,是一次基于数据的“升级”,而非基于技术潮流的“信仰”。

最后一点

你的数据库 本身就是一个可靠的队列。它持久、支持事务、工具链成熟。请先把它用好。

朴素的方案可能不够“性感”,但它今天能稳定运行,并且有很大概率两年后依然在可靠地工作。真正的系统扩展,往往是在用户和需求增长之后才需要深入考虑的。先把简单、可控的东西做出来并上线,等到问题真切地浮出水面时,再有针对性地解决它。

希望这篇实践指南能帮助你简化技术栈。如果你对更多后端架构或数据库的实战技巧感兴趣,可以关注 云栈社区 的相关板块,那里有更多开发者的经验分享。




上一篇:VirtualBox 7.2.6 发布,初步支持Linux 6.19内核并修复大量稳定性问题
下一篇:RabbitMQ 与 FastAPI 集成利器:Python 工具库 rabbit-manager 简化开发与自动重连
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-2-5 00:30 , Processed in 1.320276 second(s), 44 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表