找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

4062

积分

0

好友

562

主题
发表于 昨天 03:53 | 查看: 9| 回复: 0

想象这样一个场景:

你刚转正没多久,Leader甩给你一个任务:“那个订单统计任务最近老延迟,你看一下。”

你心想,这还不简单?打开XXL-JOB控制台,找到那个任务,点了下“执行一次”——没反应。再点,还是没反应。看日志?日志里干干净净,连个ERROR都没有。

你慌了。

去问隔壁老张,老张瞥了一眼:“哦,那个任务啊,有时候就这样,你重启一下执行器试试。”

你重启了。好了。

第二天,又挂了。

老张又说:“你再重启一下。”

第三天,你实在受不了了。这哪是搞技术,这分明是当重启工程师啊!你一咬牙,花了两天时间,把XXL-JOB的源码从头到尾捋了一遍。看完之后你只想说一句话:原来我一直是个“调参侠”。

今天我就带着大家把XXL-JOB的核心机制彻底扒一遍。

一、先唠唠:XXL-JOB到底是什么?

简单来说,XXL-JOB就是个“包工头”。

以前我们用Spring的 @Scheduled 写定时任务,简单粗暴。但有个致命问题:如果你的项目部署了3个实例,到了预设时间点,三个实例会同时启动同一个“同步数据”的任务。结果数据被重复处理三次,第二天肯定被产品经理拎着耳朵骂。

那自己搭Quartz集群呢?配置复杂,还得处理分布式锁、失败重试、监控告警……折腾半个月,稳定性还不一定有保障。

XXL-JOB 的出现就是为了解决这些痛点。它巧妙地将“何时触发”与“谁去执行”这两件事拆分开:

  • 调度中心:相当于“包工头”,负责看表,到点了就喊人干活。
  • 执行器:就是“干活的”,真正执行业务逻辑,包工头喊谁谁动。

你只需要做四步:搭建调度中心、项目里集成执行器、在调度中心配置任务、编写业务代码。听起来简单,但背后的问题才关键:包工头怎么知道张三还活着?到点怎么喊人?张三干到一半死了怎么办?这些才是XXL-JOB真正的门道。

二、第一扒:执行器如何与调度中心“接头”?(注册发现机制)

调度中心要给执行器发任务,首先得认识它。答案是:执行器主动去抱大腿

2.1 报到:向调度中心注册

你的项目一启动,执行器就会主动向调度中心发起注册。报到时说三件事:

  1. 我叫啥:比如 order-executor,调度中心靠这个给执行器分组。
  2. 我住哪儿:比如 http://192.168.1.100:9999/xxl-job-executor,任务就往这个地址发。
  3. 我的门牌号:IP和端口,作为备用标识。

报到动作本质上是一个HTTP POST请求,发往调度中心的 /api/registry 接口。调度中心收到后,会把信息记录在数据库的 xxl_job_registry 表里,关键字段有 registry_group(区分执行器还是调度中心)、registry_key(执行器名)、registry_value(地址)、update_time(最后报到时间)。

2.2 刷脸:心跳保活

报到完就没事了?不行。执行器必须让调度中心知道自己一直“活着”。所以它每隔30秒(默认配置 xxl.job.executor.heartbeat.interval=30000)会再次调用 /api/registry 接口,刷新自己的 update_time,这就是心跳。

2.3 踢人:下线检测

调度中心这边,有个后台线程每隔60秒(默认 xxl.job.registry.check.interval=60000)扫描一次 xxl_job_registry 表,如果发现某个执行器的 update_time 超过90秒没更新,就认为它死了,直接删除记录,后续任务不再发给它。

💥 常见坑点:执行器明明活着,调度中心却显示“离线”

大概率是这两个原因:

  1. 心跳发不出去:调度中心地址配错了(比如多了个斜杠),或者网络不通、防火墙阻拦。
  2. 检测线程出问题:数据库连接挂了,或者检测间隔配得太小。

排查步骤

  • 查看执行器日志,搜索 registry success
  • 直接查数据库 xxl_job_registry 表,看记录是否存在且 update_time 是否最新。
  • telnet 命令测试调度中心的端口是否畅通。

相关源码(XxlJobRegistryHelper类)

// 调度中心的心跳检测线程
registryMonitorThread = new Thread(() -> {
    while (!registryMonitorThread.isInterrupted()) {
        try {
            TimeUnit.SECONDS.sleep(60); // 默认60秒扫一次
            // 清理90秒内没更新心跳的离线机器
            int offset = 90 / 60;
            // delete from xxl_job_registry where update_time < 当前时间-90秒
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }
});

三、第二扒:任务到点,如何精准触发?

这是XXL-JOB的核心。你配置了一个CRON表达式,比如“0 0 3 ?”(每天凌晨3点),调度中心如何准时触发?这里有两个关键角色:调度线程触发线程池

3.1 任务存储与时间轮

你配置的任务信息都存在 xxl_job_info 表里。调度中心启动后,会把这些任务加载到内存。面对成千上万的任务,不可能每个任务都用一个线程盯着,太浪费资源。XXL-JOB采用了“时间轮”算法。

可以把时间轮想象成一个60格的表盘,每格代表1秒。调度中心根据任务的下次执行时间,将其放入对应的格子。比如现在是2:59:58,一个任务3:00:00执行,就放到“2秒后”的格子里。时间轮每秒转一格,转到哪个格子,就把里面的任务取出来准备触发。这种方式比每秒全表扫描高效得多。

3.2 快慢线程池:隔离慢任务

时间轮取出的任务,会被提交到触发线程池。这里有个巧妙设计:XXL-JOB准备了两个线程池——一个快,一个慢。

如果一个任务在1分钟内被提交超过10次(触发过于频繁),就会被判定为“慢任务”,自动路由到慢线程池执行。慢线程池的线程数更少,队列更长,这样慢任务就不会拖垮快线程池,影响其他正常任务的触发。

默认配置

  • 快线程池:最大线程数200,队列长度1000。
  • 慢线程池:最大线程数100,队列长度2000。

3.3 触发任务:HTTP调用执行器

触发线程拿到任务后,会通过HTTP调用执行器的 /run 接口,传递关键参数:jobIdexecutorHandler(任务处理器名)、executorParams(参数)、logIdexecutorTimeoutexecutorBlockStrategy(阻塞策略)等。

💥 常见坑点:任务到点了没触发?

如果执行器在线,可以检查以下几点:

  1. CRON表达式错误:调度中心配置页面有“CRON验证”功能,务必点一下看看下次执行时间是否符合预期。
  2. 任务被暂停:检查任务状态是否为“正常”。
  3. 触发线程池满了:查看调度中心日志是否有 trigger thread pool is full,需要调大线程池参数或优化任务频率。
  4. 调度线程卡住:尝试重启调度中心。

相关源码(JobScheduleHelper类)

// 时间轮触发逻辑
ringThread = new Thread(() -> {
    while (!ringThread.isInterrupted()) {
        try {
            // 获取当前秒数
            int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
            // 为了避免处理耗时太长跨过刻度,向前校验一个刻度
            List<Integer> ringItemData = new ArrayList<>();
            for (int i = 0; i < 2; i++) {
                List<Integer> tmpData = ringData.remove((nowSecond + 60 - i) % 60);
                if (tmpData != null) {
                    ringItemData.addAll(tmpData);
                }
            }
            // 触发任务
            for (int jobId : ringItemData) {
                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
            }
        } catch (Exception e) {
            logger.error(“ringThread error:{}”, e.getMessage());
        }
    }
});

四、第三扒:执行器收到任务后如何执行?

调度中心“派活”,执行器开始“干活”。

4.1 接单与执行线程池

执行器收到 /run 请求后,先做基本验证,然后将任务封装,并提交到自己的执行线程池。这个线程池的配置项如下:

  • xxl.job.executor.threadPool.coreSize:核心线程数,默认10。
  • xxl.job.executor.threadPool.maxSize:最大线程数,默认200。
  • queueSize:队列大小,默认2000。

注意:执行器的线程池和调度中心的触发线程池是两回事。

4.2 阻塞策略:任务积压怎么办?

如果执行器线程池和队列都满了,新任务怎么处理?这取决于你在调度中心配置的阻塞策略

策略 行为 适用场景
单机串行(默认) 新任务排队,等前面的任务执行完 任务不能丢,允许等待
丢弃后续调度 新任务直接丢弃,不执行 周期性任务,允许偶尔丢弃
覆盖之前调度 丢弃队列里较旧的任务,执行新任务 必须执行最新数据
失败转移 将任务发给同组其他健康的执行器 任务必须执行,要求高可用

4.3 日志记录与超时控制

执行线程会反射调用对应的 @XxlJob 方法。日志会先写到本地文件,再异步上报到调度中心入库,这样做是为了防止网络问题导致日志丢失。

如果任务执行超时,执行器会强制中断任务线程。因此,在编写包含循环的任务时,应做好中断判断:

@XxlJob(“orderSyncJob”)
public void orderSyncJob() throws Exception {
    while (true) {
        // 关键:判断线程是否被中断
        if (Thread.currentThread().isInterrupted()) {
            XxlJobLogger.log(“任务被超时杀死,退出执行”);
            return;
        }
        // 业务逻辑
        syncOrder();
        Thread.sleep(1000);
    }
}

4.4 结果回调

任务执行完毕后(无论成功失败),执行器都会回调调度中心的 /api/callback 接口,上报 jobIdlogId、执行结果和错误信息。调度中心据此更新日志状态,并根据配置决定是否重试。

💥 常见坑点:任务明明成功了,调度中心却显示失败?

排查思路:

  1. 回调失败:查看执行器日志是否有 callback success。检查调度中心地址和网络。
  2. 被超时杀死:检查执行日志是否有 timeout kill 记录。
  3. 业务异常:任务代码抛出了未捕获的异常。
  4. 线程池满,任务被丢弃:调大执行器线程池参数。

五、第四扒:执行器集群,任务如何分发?(路由策略)

当执行器集群部署时,调度中心依据路由策略决定将任务发给谁。XXL-JOB提供了9种策略,这里介绍几个常用的:

  1. 轮询:依次发给每个实例,公平简单。适合实例配置相同、任务耗时均匀的场景。
  2. 随机:随机选择一个实例。能一定程度上避免轮询可能导致的“慢实例堆积”问题。
  3. 一致性哈希:基于 jobId 哈希,同一任务永远发给同一实例。适合需要状态一致性的任务(如处理某个数据范围)。
  4. 最少活跃数:发给当前正在执行任务最少的实例。能实现动态负载均衡。
  5. 故障转移:按顺序发送,失败则自动换下一个实例重试。适合高可用场景。
  6. 分片广播最复杂也最强大,任务会发给所有实例,每个实例都执行,通常用于大数据处理并行计算。每个实例可以通过 XxlJobHelper.getShardIndex()getShardTotal() 获取自己的分片序号和总分片数,从而处理不同的数据子集。

如何选择路由策略?

  • 通用场景:轮询
  • 任务有状态:一致性哈希
  • 实例负载不均:最少活跃数
  • 任务必须执行:故障转移
  • 海量数据处理:分片广播

分片广播相关源码(XxlJobTrigger类)

// 分片广播的处理逻辑
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == strategyEnum 
        && group.getRegistryList() != null && !group.getRegistryList().isEmpty()) {
    for (int i = 0; i < group.getRegistryList().size(); i++) {
        // i是当前机器索引,size是总分片数
        processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
    }
}

六、第五扒:如何保证调度中心集群下任务不重复执行?

调度中心也集群部署时,如何避免多个“包工头”同时触发同一个任务?XXL-JOB的解决方案是:数据库悲观锁

  1. 调度中心在触发任务前,会尝试向 xxl_job_lock 表插入一条 lock_nameschedule_lock 的记录。
  2. 由于 lock_name 是唯一索引,同一时刻只有一个调度中心节点能插入成功(获得锁)。
  3. 获得锁的节点执行扫表、触发任务等操作。
  4. 操作完成后提交事务,锁释放。

相关源码(JobScheduleHelper类)

// 获取锁
conn = dataSource.getConnection();
conn.setAutoCommit(false);
// 悲观锁,锁定记录
preparedStatement = conn.prepareStatement(“select * from xxl_job_lock where lock_name = ‘schedule_lock’ for update”);
preparedStatement.execute();
// 扫表、发任务...
// 提交事务,释放锁
conn.commit();

💥 常见坑点:任务还是重复执行了?

如果排除了分布式锁的问题,可能是:

  1. 路由策略选错:误用了“分片广播”,导致所有实例都执行。应改为“轮询”等。
  2. 失败重试导致:任务失败后,根据配置重试了多次。

重要提醒:XXL-JOB能避免“同一调度周期内”的重复执行,但无法避免“不同时间”的重复执行(如失败重试)。因此,业务层必须自行实现幂等性处理,例如使用唯一业务ID、Redis锁等。

七、实战:手写一个分片广播任务

理论懂了,来点实战代码。我们写一个处理用户数据的分片任务。

7.1 添加依赖

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.4.0</version>
</dependency>

7.2 配置执行器

@Configuration
public class XxlJobConfig {
    @Value(“${xxl.job.admin.addresses}“)
    private String adminAddresses;
    @Value(“${xxl.job.executor.appname}“)
    private String appname;
    @Value(“${xxl.job.executor.port}“)
    private int port;
    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        XxlJobSpringExecutor executor = new XxlJobSpringExecutor();
        executor.setAdminAddresses(adminAddresses);
        executor.setAppname(appname);
        executor.setPort(port);
        executor.setLogRetentionDays(30);
        return executor;
    }
}

配置文件 application.yml

xxl:
  job:
    admin:
      addresses: http://localhost:8080/xxl-job-admin
    executor:
      appname: user-executor
      port: 9999

7.3 编写分片任务逻辑

@Component
public class UserDataJob {
    @XxlJob(“userDataShardingJob”)
    public void shardingJob() {
        // 1. 获取分片参数
        int shardIndex = XxlJobHelper.getShardIndex(); // 当前分片序号,从0开始
        int shardTotal = XxlJobHelper.getShardTotal(); // 总分片数
        XxlJobHelper.log(“当前分片:{}/{}“, shardIndex + 1, shardTotal);
        // 2. 模拟获取所有待处理数据ID (例如从数据库查询)
        List<Long> allUserIds = getAllUserIds();
        // 3. 根据分片参数,计算本实例应处理的数据
        List<Long> myUserIds = new ArrayList<>();
        for (int i = 0; i < allUserIds.size(); i++) {
            // 简单取模分片
            if (i % shardTotal == shardIndex) {
                myUserIds.add(allUserIds.get(i));
            }
        }
        XxlJobHelper.log(“本分片需要处理 {} 个用户“, myUserIds.size());
        // 4. 处理本分片的数据
        int successCount = 0;
        int failCount = 0;
        for (Long userId : myUserIds) {
            try {
                processUser(userId); // 实际业务处理
                successCount++;
                // 进度日志
                if (successCount % 100 == 0) {
                    XxlJobHelper.log(“已处理 {} 个用户“, successCount);
                }
            } catch (Exception e) {
                failCount++;
                XxlJobHelper.log(“处理用户 {} 失败: {}“, userId, e.getMessage());
            }
        }
        // 5. 记录结果
        XxlJobHelper.log(“分片处理完成,成功:{},失败:{}“, successCount, failCount);
    }
    // … getAllUserIds() 和 processUser() 模拟方法省略
}

7.4 调度中心配置

在XXL-JOB控制台创建任务:

  • 执行器:选择 user-executor
  • JobHandler:填写 userDataShardingJob
  • 路由策略分片广播
  • CRON:例如 0 0 3 * * ? (每天凌晨3点)

当有3个执行器实例时,10万用户数据会被大致均分,三个实例并行处理,效率大幅提升。

八、调优建议

8.1 调度中心调优

  1. 集群部署:至少2节点,通过Nginx负载均衡,连接同一数据库。
  2. 调整触发线程池:高频率触发场景,调大 xxl.job.trigger.core.pool.size
  3. 数据库优化:为 xxl_job_infoxxl_job_log 表的关键字段(如 job_group, trigger_status, job_id, trigger_time)建立索引,并定期归档历史日志。
  4. 关闭无用功能:如不使用GLUE模式,可在配置中关闭。

8.2 执行器调优

  1. 合理部署实例数:根据任务量和处理能力决定。
  2. 优化执行线程池
    • 短任务(<1秒):增大 coreSize(如20-50),减小 queueSize
    • 长任务(>10秒):控制 coreSize(如10-20),增大 queueSize
  3. 日志与资源:调大日志上传线程池(若日志量大),并配置本地日志保留天数,避免磁盘占满。

8.3 任务本身调优

  1. 大任务拆分:执行超过10分钟的任务应考虑拆分子任务。
  2. 避免资源竞争:多个任务操作同一资源时,错开执行时间或使用串行策略。
  3. 实现幂等性:这是必须的。
  4. 精简日志:避免在循环内打印大对象,只记录关键节点和异常。

九、总结

扒完源码,你会发现XXL-JOB的核心机制并不神秘,主要就是六点:

  1. 注册发现:心跳保活与定时清理,维持集群状态。
  2. 任务触发:时间轮定时,快慢线程池分发,保证准时与隔离。
  3. 任务执行:本地线程池消化,配合阻塞与超时策略。
  4. 结果回调:异步上报,支持失败重试。
  5. 负载均衡:丰富的路由策略,适应不同场景。
  6. 分布式锁:基于数据库锁,保证调度中心集群下的任务唯一性。

日常遇到的90%问题,基本都能在上述机制中找到答案。希望这篇深入源码分析能帮你摆脱“重启工程师”的困境,真正理解并掌控你手中的分布式任务调度工具。如果你对文中的某个细节有更深的问题,或想了解更多Spring Boot集成实战案例,可以来 云栈社区 和大家一起交流探讨。




上一篇:统计思维与可重复性危机:为什么多数科学结论可能不可靠?
下一篇:MacBook Neo搭配A18 Pro芯片:苹果的低价策略能否撬动校园与轻度办公市场?
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-3-10 08:03 , Processed in 0.476888 second(s), 42 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表