一、整体架构思路
构建一个高性能的批量任务处理器,核心在于合理利用Java并发包提供的强大工具,以线程池为基础,结合多种同步机制,实现任务的并行处理、进度监控和结果汇总。主要设计思路如下:
- 任务分片:将原始任务列表按指定大小拆分,每个分片作为一个子任务,避免单个任务过大导致的并发效率低下。
- 线程池执行:使用
ThreadPoolExecutor创建自定义线程池,提交所有子任务并发执行。
- 进度统计:使用线程安全的原子类(
AtomicInteger)统计已完成、失败任务数,避免并发计数问题。
- 结果汇总:使用线程安全的集合(
CopyOnWriteArrayList)存储每个任务的执行结果,支持后续查询。
- 分阶段协同:使用
CountDownLatch实现“等待所有子任务完成后汇总结果”,使用CyclicBarrier实现“多阶段任务协同”(例如“读取→处理→汇总”三步流程)。
- 容错处理:每个子任务执行时捕获异常,记录失败原因,确保单个任务失败不中断整体流程。
二、核心技术选型与流程图
- 线程池:
ThreadPoolExecutor(自定义配置,适配不同任务场景)。
- 并发工具:
CountDownLatch(等待所有子任务完成)、CyclicBarrier(多阶段任务协同)。
- 线程安全组件:
AtomicInteger(原子计数)、CopyOnWriteArrayList(线程安全结果存储)。
- 任务抽象:定义
Task接口,支持任意类型任务的扩展(如数据导入、短信发送)。
核心流程图:

三、处理器实现:BatchTaskProcessor 核心代码
掌握Java并发编程的精髓是设计此类工具的基础。以下是处理器的核心实现代码:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 批量任务处理器:支持任务分片、并发执行、进度监控、结果汇总
*/
public class BatchTaskProcessor<T> {
// 线程池(核心配置)
private final ExecutorService executorService;
// 任务分片大小
private final int batchSize;
// 已完成任务数(原子类,线程安全)
private final AtomicInteger completedCount = new AtomicInteger(0);
// 失败任务数(原子类,线程安全)
private final AtomicInteger failedCount = new AtomicInteger(0);
// 任务执行结果列表(线程安全集合)
private final List<TaskResult<T>> resultList = new CopyOnWriteArrayList<>();
/**
* 构造器:自定义线程池配置
* @param corePoolSize 核心线程数
* @param maxPoolSize 最大线程数
* @param keepAliveTime 非核心线程空闲时间
* @param batchSize 任务分片大小
*/
public BatchTaskProcessor(int corePoolSize, int maxPoolSize, long keepAliveTime, int batchSize) {
this.batchSize = Math.max(batchSize, 1); // 分片大小至少为1
// 初始化线程池(自定义配置,避免OOM)
this.executorService = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), // 有界任务队列,避免任务堆积
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:提交线程自行执行
);
}
/**
* 核心方法:执行批量任务(无阶段协同)
* @param taskList 原始任务列表
* @param taskHandler 任务处理器(自定义任务执行逻辑)
* @return 执行结果汇总
* @throws InterruptedException 线程中断异常
*/
public BatchResult<T> executeBatchTasks(List<T> taskList, TaskHandler<T> taskHandler) throws InterruptedException {
if (taskList == null || taskList.isEmpty()) {
return new BatchResult<>(0, 0, resultList);
}
// 1. 任务分片
List<List<T>> taskShards = splitTask(taskList, batchSize);
int totalTaskCount = taskList.size();
CountDownLatch countDownLatch = new CountDownLatch(taskShards.size());
System.out.println("📋 批量任务开始执行:");
System.out.println(" - 总任务数:" + totalTaskCount);
System.out.println(" - 任务分片数:" + taskShards.size());
System.out.println(" - 线程池线程数:" + executorService.toString());
// 2. 提交分片任务到线程池
for (int i = 0; i < taskShards.size(); i++) {
int shardIndex = i;
List<T> shard = taskShards.get(i);
executorService.submit(() -> {
try {
// 执行当前分片的所有任务
for (T task : shard) {
TaskResult<T> result = new TaskResult<>();
result.setTask(task);
try {
// 自定义任务执行逻辑(如导入数据、发送短信)
boolean success = taskHandler.process(task);
result.setSuccess(success);
if (success) {
completedCount.incrementAndGet();
} else {
failedCount.incrementAndGet();
result.setErrorMessage("任务执行失败(无异常)");
}
} catch (Exception e) {
// 捕获任务执行异常,记录失败原因
failedCount.incrementAndGet();
result.setSuccess(false);
result.setErrorMessage("任务执行失败:" + e.getMessage());
}
// 存储任务结果(线程安全)
resultList.add(result);
}
System.out.printf("✅ 分片%d执行完成,处理任务数:%d\n", shardIndex, shard.size());
} finally {
// 分片任务完成,倒计时减1(必须在finally中,确保必执行)
countDownLatch.countDown();
}
});
}
// 3. 主线程阻塞等待所有分片任务完成
countDownLatch.await();
// 4. 关闭线程池(不再接收新任务,等待已提交任务完成)
executorService.shutdown();
// 5. 返回汇总结果
return new BatchResult<>(completedCount.get(), failedCount.get(), resultList);
}
/**
* 进阶方法:分阶段批量任务(如“读取→处理→汇总”)
* @param taskList 原始任务列表
* @param readHandler 读取阶段处理器
* @param processHandler 处理阶段处理器
* @param summaryHandler 汇总阶段处理器
* @return 最终汇总结果
* @throws InterruptedException 线程中断异常
* @throws BrokenBarrierException 屏障损坏异常
*/
public String executePhasedBatchTasks(List<T> taskList,
TaskHandler<T> readHandler,
TaskHandler<T> processHandler,
SummaryHandler<T> summaryHandler) throws InterruptedException, BrokenBarrierException {
if (taskList == null || taskList.isEmpty()) {
return "无任务可执行";
}
// 任务分片
List<List<T>> taskShards = splitTask(taskList, batchSize);
int shardCount = taskShards.size();
// CyclicBarrier:分阶段协同,所有分片完成当前阶段后,进入下一阶段
CyclicBarrier barrier = new CyclicBarrier(shardCount, () -> {
// 屏障动作:所有分片完成当前阶段后执行(如打印阶段进度)
System.out.println("\n🚩 当前阶段所有分片执行完成,进入下一阶段");
});
// 初始化线程池
ExecutorService phasedExecutor = new ThreadPoolExecutor(
shardCount, shardCount, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
System.out.println("📋 分阶段批量任务开始执行:");
System.out.println(" - 总任务数:" + taskList.size());
System.out.println(" - 分片数:" + shardCount);
// 提交分阶段任务
for (int i = 0; i < shardCount; i++) {
int shardIndex = i;
List<T> shard = taskShards.get(i);
phasedExecutor.submit(() -> {
try {
// 第一阶段:读取数据(如从文件/数据库读取)
System.out.printf("📥 分片%d开始读取阶段\n", shardIndex);
for (T task : shard) {
readHandler.process(task);
}
System.out.printf("📥 分片%d读取阶段完成\n", shardIndex);
// 等待所有分片完成读取阶段
barrier.await();
// 第二阶段:处理数据(如数据清洗、业务逻辑处理)
System.out.printf("⚙️ 分片%d开始处理阶段\n", shardIndex);
for (T task : shard) {
processHandler.process(task);
}
System.out.printf("⚙️ 分片%d处理阶段完成\n", shardIndex);
// 等待所有分片完成处理阶段
barrier.await();
// 第三阶段:汇总数据(如统计分片结果)
System.out.printf("📊 分片%d开始汇总阶段\n", shardIndex);
summaryHandler.summary(shard);
System.out.printf("📊 分片%d汇总阶段完成\n", shardIndex);
// 等待所有分片完成汇总阶段
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
System.err.printf("❌ 分片%d执行异常:%s\n", shardIndex, e.getMessage());
Thread.currentThread().interrupt();
} catch (Exception e) {
System.err.printf("❌ 分片%d执行异常:%s\n", shardIndex, e.getMessage());
}
});
}
// 等待所有任务完成,关闭线程池
phasedExecutor.shutdown();
while (!phasedExecutor.isTerminated()) {
Thread.sleep(100);
}
// 最终汇总结果
return summaryHandler.getFinalSummary();
}
/**
* 工具方法:任务分片(将原始任务列表按batchSize拆分)
*/
private List<List<T>> splitTask(List<T> taskList, int batchSize) {
List<List<T>> shards = new ArrayList<>();
int total = taskList.size();
int start = 0;
while (start < total) {
int end = Math.min(start + batchSize, total);
shards.add(taskList.subList(start, end));
start = end;
}
return shards;
}
/**
* 任务处理器接口(自定义任务执行逻辑)
*/
public interface TaskHandler<T> {
boolean process(T task) throws Exception;
}
/**
* 汇总处理器接口(自定义汇总逻辑)
*/
public interface SummaryHandler<T> {
void summary(List<T> shardTask);
String getFinalSummary();
}
/**
* 单个任务执行结果实体
*/
public static class TaskResult<T> {
private T task;
private boolean success;
private String errorMessage;
// getter/setter
public T getTask() { return task; }
public void setTask(T task) { this.task = task; }
public boolean isSuccess() { return success; }
public void setSuccess(boolean success) { this.success = success; }
public String getErrorMessage() { return errorMessage; }
public void setErrorMessage(String errorMessage) { this.errorMessage = errorMessage; }
}
/**
* 批量任务汇总结果实体
*/
public static class BatchResult<T> {
private int completedCount; // 成功任务数
private int failedCount; // 失败任务数
private List<TaskResult<T>> taskResults; // 所有任务结果
public BatchResult(int completedCount, int failedCount, List<TaskResult<T>> taskResults) {
this.completedCount = completedCount;
this.failedCount = failedCount;
this.taskResults = taskResults;
}
// getter
public int getCompletedCount() { return completedCount; }
public int getFailedCount() { return failedCount; }
public List<TaskResult<T>> getTaskResults() { return taskResults; }
@Override
public String toString() {
return "\n📊 批量任务执行汇总:" +
"\n - 总任务数:" + (completedCount + failedCount) +
"\n - 成功数:" + completedCount +
"\n - 失败数:" + failedCount +
"\n - 成功率:" + String.format("%.2f%%", (double) completedCount / (completedCount + failedCount) * 100);
}
}
}
四、使用示例:2 种常见场景
场景 1:基础批量任务(无阶段协同)
以下示例模拟批量发送短信任务,无需分阶段,直接并发执行所有任务:
import java.util.Arrays;
import java.util.List;
public class BasicBatchDemo {
public static void main(String[] args) {
// 1. 模拟100条短信发送任务(任务数据:手机号)
List<String> phoneList = Arrays.asList(
"13800138000", "13900139000", "13700137000",
// ... 省略97条手机号
"13600136000"
);
// 2. 创建批量任务处理器(CPU密集型任务:核心线程数=CPU核心数+1;IO密集型=2*CPU核心数+1)
// 短信发送是IO密集型(网络请求),此处配置8线程,分片大小10
BatchTaskProcessor<String> processor = new BatchTaskProcessor<>(
8, 10, 60, 10
);
// 3. 自定义任务执行逻辑(短信发送)
BatchTaskProcessor.TaskHandler<String> smsHandler = phone -> {
// 模拟短信发送逻辑(如调用第三方短信API)
System.out.printf("发送短信到:%s\n", phone);
Thread.sleep(100); // 模拟网络请求耗时
return true; // 执行成功
};
// 4. 执行批量任务并获取结果
try {
BatchTaskProcessor.BatchResult<String> result = processor.executeBatchTasks(phoneList, smsHandler);
System.out.println(result);
// 5. 输出失败任务详情(可选)
result.getTaskResults().stream()
.filter(taskResult -> !taskResult.isSuccess())
.forEach(taskResult -> System.out.printf("❌ 失败任务:%s,原因:%s\n",
taskResult.getTask(), taskResult.getErrorMessage()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
场景 2:分阶段批量任务(读取→处理→汇总)
以下示例模拟批量导入用户数据任务,分三阶段执行:读取 Excel 数据→清洗数据→汇总统计:
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class PhasedBatchDemo {
public static void main(String[] args) {
// 1. 模拟100条原始用户数据(待导入)
List<String> userDataList = Arrays.asList(
"张三,20,13800138000", "李四,25,13900139000",
// ... 省略98条数据
"王五,30,13700137000"
);
// 2. 创建批量任务处理器(分片大小15)
BatchTaskProcessor<String> processor = new BatchTaskProcessor<>(
6, 8, 60, 15
);
// 3. 自定义分阶段处理器
// 第一阶段:读取数据(模拟从Excel读取)
BatchTaskProcessor.TaskHandler<String> readHandler = data -> {
System.out.printf("读取数据:%s\n", data);
return true;
};
// 第二阶段:处理数据(模拟数据清洗,去除无效数据)
BatchTaskProcessor.TaskHandler<String> processHandler = data -> {
String[] parts = data.split(",");
if (parts.length != 3) {
throw new RuntimeException("数据格式错误:" + data);
}
System.out.printf("清洗数据:%s→%s(格式校验通过)\n", data, parts[0]);
return true;
};
// 第三阶段:汇总统计(统计成功导入的用户数)
AtomicInteger totalImported = new AtomicInteger(0);
BatchTaskProcessor.SummaryHandler<String> summaryHandler = new BatchTaskProcessor.SummaryHandler<String>() {
@Override
public void summary(List<String> shardTask) {
// 统计当前分片的有效数据数
int validCount = (int) shardTask.stream()
.filter(data -> data.split(",").length == 3)
.count();
totalImported.addAndGet(validCount);
}
@Override
public String getFinalSummary() {
return String.format("\n📊 分阶段任务汇总:成功导入用户数=%d,总处理数据数=%d",
totalImported.get(), userDataList.size());
}
};
// 4. 执行分阶段批量任务
try {
String finalResult = processor.executePhasedBatchTasks(
userDataList, readHandler, processHandler, summaryHandler
);
System.out.println(finalResult);
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
五、核心技术解析:线程池与并发工具的协同
线程池:批量任务的“高效执行引擎”
- 自定义配置适配场景:通过
corePoolSize、maxPoolSize等参数,根据任务类型(CPU密集型 / IO密集型)调整线程数,优化资源利用。
- 任务分片提升并发:将大量任务拆分为小分片,避免单个任务过大导致的线程阻塞,同时减少线程上下文切换开销。
- 拒绝策略保障稳健:使用
CallerRunsPolicy拒绝策略,当任务队列满时,由提交线程自行执行任务,这是一种简单的数据库与中间件任务处理中常用的防丢失策略。
CountDownLatch:单向等待的“同步开关”
- 用于“主线程等待所有子任务完成”:初始化倒计时次数为分片数,每个分片任务完成后调用
countDown(),主线程通过await()阻塞等待,确保汇总结果时所有任务已执行完毕。
finally块中调用countDown():确保即使任务执行异常,倒计时也会正常减少,避免主线程无限阻塞。
CyclicBarrier:多阶段协同的“阶段闸门”
- 用于“分阶段任务协同”:每个阶段所有分片完成后,通过屏障动作(如打印阶段进度)通知,再进入下一阶段,确保阶段执行的有序性。
- 可重复使用:相比
CountDownLatch,CyclicBarrier支持多轮同步,完美适配“读取→处理→汇总”等多阶段流程。
线程安全组件:并发环境的数据保障
AtomicInteger:原子计数,避免多线程并发修改任务数导致的计数错误。
CopyOnWriteArrayList:线程安全的集合,适用于“读多写少”的场景(批量任务中结果读取频率低于写入频率),避免ArrayList在并发环境下的ConcurrentModificationException。
六、避坑指南:使用注意事项
-
线程池配置需适配任务类型:
- CPU密集型任务(如数据计算):核心线程数设为CPU核心数 + 1。
- IO密集型任务(如网络请求、文件读写):核心线程数可设为
2 * CPU核心数 + 1。
- 任务队列必须用有界队列(如
ArrayBlockingQueue),避免无界队列导致的任务堆积和OOM。
-
并发工具的正确使用:
CountDownLatch:确保countDown()调用次数与初始化次数一致,建议在finally块中执行。
CyclicBarrier:注意处理BrokenBarrierException(如线程中断、超时导致的屏障损坏)。
- 避免在屏障动作中执行耗时操作,该动作由最后一个到达的线程执行,耗时操作会阻塞所有线程。
-
任务分片大小合理设置:
- 分片过小:导致分片数过多,线程切换频繁,效率下降。
- 分片过大:单个分片任务执行时间过长,并发优势不明显。
- 建议:根据任务平均执行时间调整,IO密集型任务的分片大小可适当增大。
-
异常处理不可忽视:
- 单个任务执行异常必须捕获,避免单个任务失败导致整个分片任务中断。
- 记录失败任务详情,方便后续复盘和重试,提升批量任务的可维护性。
七、总结
本文实现的高性能批量任务处理器,其核心在于巧妙组合Java的“线程池”与“并发工具”。线程池解决了“任务高效执行”的问题,而CountDownLatch、CyclicBarrier等工具则解决了“任务同步协同”的难题。通过合理的分片策略、线程安全的数据结构和全面的异常处理,该处理器能够稳健地应对数据导入、消息推送等多种批量处理场景,是提升后端服务处理能力的有效实践。