找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

467

积分

0

好友

63

主题
发表于 前天 12:40 | 查看: 9| 回复: 0

一、整体架构思路

构建一个高性能的批量任务处理器,核心在于合理利用Java并发包提供的强大工具,以线程池为基础,结合多种同步机制,实现任务的并行处理、进度监控和结果汇总。主要设计思路如下:

  • 任务分片:将原始任务列表按指定大小拆分,每个分片作为一个子任务,避免单个任务过大导致的并发效率低下。
  • 线程池执行:使用ThreadPoolExecutor创建自定义线程池,提交所有子任务并发执行。
  • 进度统计:使用线程安全的原子类(AtomicInteger)统计已完成、失败任务数,避免并发计数问题。
  • 结果汇总:使用线程安全的集合(CopyOnWriteArrayList)存储每个任务的执行结果,支持后续查询。
  • 分阶段协同:使用CountDownLatch实现“等待所有子任务完成后汇总结果”,使用CyclicBarrier实现“多阶段任务协同”(例如“读取→处理→汇总”三步流程)。
  • 容错处理:每个子任务执行时捕获异常,记录失败原因,确保单个任务失败不中断整体流程。

二、核心技术选型与流程图

  1. 线程池ThreadPoolExecutor(自定义配置,适配不同任务场景)。
  2. 并发工具CountDownLatch(等待所有子任务完成)、CyclicBarrier(多阶段任务协同)。
  3. 线程安全组件AtomicInteger(原子计数)、CopyOnWriteArrayList(线程安全结果存储)。
  4. 任务抽象:定义Task接口,支持任意类型任务的扩展(如数据导入、短信发送)。

核心流程图批量任务处理器核心流程图

三、处理器实现:BatchTaskProcessor 核心代码

掌握Java并发编程的精髓是设计此类工具的基础。以下是处理器的核心实现代码:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 批量任务处理器:支持任务分片、并发执行、进度监控、结果汇总
 */
public class BatchTaskProcessor<T> {
    // 线程池(核心配置)
    private final ExecutorService executorService;
    // 任务分片大小
    private final int batchSize;
    // 已完成任务数(原子类,线程安全)
    private final AtomicInteger completedCount = new AtomicInteger(0);
    // 失败任务数(原子类,线程安全)
    private final AtomicInteger failedCount = new AtomicInteger(0);
    // 任务执行结果列表(线程安全集合)
    private final List<TaskResult<T>> resultList = new CopyOnWriteArrayList<>();

    /**
     * 构造器:自定义线程池配置
     * @param corePoolSize 核心线程数
     * @param maxPoolSize 最大线程数
     * @param keepAliveTime 非核心线程空闲时间
     * @param batchSize 任务分片大小
     */
    public BatchTaskProcessor(int corePoolSize, int maxPoolSize, long keepAliveTime, int batchSize) {
        this.batchSize = Math.max(batchSize, 1); // 分片大小至少为1
        // 初始化线程池(自定义配置,避免OOM)
        this.executorService = new ThreadPoolExecutor(
                corePoolSize,
                maxPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100), // 有界任务队列,避免任务堆积
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:提交线程自行执行
        );
    }

    /**
     * 核心方法:执行批量任务(无阶段协同)
     * @param taskList 原始任务列表
     * @param taskHandler 任务处理器(自定义任务执行逻辑)
     * @return 执行结果汇总
     * @throws InterruptedException 线程中断异常
     */
    public BatchResult<T> executeBatchTasks(List<T> taskList, TaskHandler<T> taskHandler) throws InterruptedException {
        if (taskList == null || taskList.isEmpty()) {
            return new BatchResult<>(0, 0, resultList);
        }

        // 1. 任务分片
        List<List<T>> taskShards = splitTask(taskList, batchSize);
        int totalTaskCount = taskList.size();
        CountDownLatch countDownLatch = new CountDownLatch(taskShards.size());
        System.out.println("📋 批量任务开始执行:");
        System.out.println(" - 总任务数:" + totalTaskCount);
        System.out.println(" - 任务分片数:" + taskShards.size());
        System.out.println(" - 线程池线程数:" + executorService.toString());

        // 2. 提交分片任务到线程池
        for (int i = 0; i < taskShards.size(); i++) {
            int shardIndex = i;
            List<T> shard = taskShards.get(i);
            executorService.submit(() -> {
                try {
                    // 执行当前分片的所有任务
                    for (T task : shard) {
                        TaskResult<T> result = new TaskResult<>();
                        result.setTask(task);
                        try {
                            // 自定义任务执行逻辑(如导入数据、发送短信)
                            boolean success = taskHandler.process(task);
                            result.setSuccess(success);
                            if (success) {
                                completedCount.incrementAndGet();
                            } else {
                                failedCount.incrementAndGet();
                                result.setErrorMessage("任务执行失败(无异常)");
                            }
                        } catch (Exception e) {
                            // 捕获任务执行异常,记录失败原因
                            failedCount.incrementAndGet();
                            result.setSuccess(false);
                            result.setErrorMessage("任务执行失败:" + e.getMessage());
                        }
                        // 存储任务结果(线程安全)
                        resultList.add(result);
                    }
                    System.out.printf("✅ 分片%d执行完成,处理任务数:%d\n", shardIndex, shard.size());
                } finally {
                    // 分片任务完成,倒计时减1(必须在finally中,确保必执行)
                    countDownLatch.countDown();
                }
            });
        }

        // 3. 主线程阻塞等待所有分片任务完成
        countDownLatch.await();

        // 4. 关闭线程池(不再接收新任务,等待已提交任务完成)
        executorService.shutdown();

        // 5. 返回汇总结果
        return new BatchResult<>(completedCount.get(), failedCount.get(), resultList);
    }

    /**
     * 进阶方法:分阶段批量任务(如“读取→处理→汇总”)
     * @param taskList 原始任务列表
     * @param readHandler 读取阶段处理器
     * @param processHandler 处理阶段处理器
     * @param summaryHandler 汇总阶段处理器
     * @return 最终汇总结果
     * @throws InterruptedException 线程中断异常
     * @throws BrokenBarrierException 屏障损坏异常
     */
    public String executePhasedBatchTasks(List<T> taskList,
                                          TaskHandler<T> readHandler,
                                          TaskHandler<T> processHandler,
                                          SummaryHandler<T> summaryHandler) throws InterruptedException, BrokenBarrierException {
        if (taskList == null || taskList.isEmpty()) {
            return "无任务可执行";
        }

        // 任务分片
        List<List<T>> taskShards = splitTask(taskList, batchSize);
        int shardCount = taskShards.size();

        // CyclicBarrier:分阶段协同,所有分片完成当前阶段后,进入下一阶段
        CyclicBarrier barrier = new CyclicBarrier(shardCount, () -> {
            // 屏障动作:所有分片完成当前阶段后执行(如打印阶段进度)
            System.out.println("\n🚩 当前阶段所有分片执行完成,进入下一阶段");
        });

        // 初始化线程池
        ExecutorService phasedExecutor = new ThreadPoolExecutor(
                shardCount, shardCount, 0, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        System.out.println("📋 分阶段批量任务开始执行:");
        System.out.println(" - 总任务数:" + taskList.size());
        System.out.println(" - 分片数:" + shardCount);

        // 提交分阶段任务
        for (int i = 0; i < shardCount; i++) {
            int shardIndex = i;
            List<T> shard = taskShards.get(i);
            phasedExecutor.submit(() -> {
                try {
                    // 第一阶段:读取数据(如从文件/数据库读取)
                    System.out.printf("📥 分片%d开始读取阶段\n", shardIndex);
                    for (T task : shard) {
                        readHandler.process(task);
                    }
                    System.out.printf("📥 分片%d读取阶段完成\n", shardIndex);
                    // 等待所有分片完成读取阶段
                    barrier.await();

                    // 第二阶段:处理数据(如数据清洗、业务逻辑处理)
                    System.out.printf("⚙️  分片%d开始处理阶段\n", shardIndex);
                    for (T task : shard) {
                        processHandler.process(task);
                    }
                    System.out.printf("⚙️  分片%d处理阶段完成\n", shardIndex);
                    // 等待所有分片完成处理阶段
                    barrier.await();

                    // 第三阶段:汇总数据(如统计分片结果)
                    System.out.printf("📊 分片%d开始汇总阶段\n", shardIndex);
                    summaryHandler.summary(shard);
                    System.out.printf("📊 分片%d汇总阶段完成\n", shardIndex);
                    // 等待所有分片完成汇总阶段
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    System.err.printf("❌ 分片%d执行异常:%s\n", shardIndex, e.getMessage());
                    Thread.currentThread().interrupt();
                } catch (Exception e) {
                    System.err.printf("❌ 分片%d执行异常:%s\n", shardIndex, e.getMessage());
                }
            });
        }

        // 等待所有任务完成,关闭线程池
        phasedExecutor.shutdown();
        while (!phasedExecutor.isTerminated()) {
            Thread.sleep(100);
        }

        // 最终汇总结果
        return summaryHandler.getFinalSummary();
    }

    /**
     * 工具方法:任务分片(将原始任务列表按batchSize拆分)
     */
    private List<List<T>> splitTask(List<T> taskList, int batchSize) {
        List<List<T>> shards = new ArrayList<>();
        int total = taskList.size();
        int start = 0;
        while (start < total) {
            int end = Math.min(start + batchSize, total);
            shards.add(taskList.subList(start, end));
            start = end;
        }
        return shards;
    }

    /**
     * 任务处理器接口(自定义任务执行逻辑)
     */
    public interface TaskHandler<T> {
        boolean process(T task) throws Exception;
    }

    /**
     * 汇总处理器接口(自定义汇总逻辑)
     */
    public interface SummaryHandler<T> {
        void summary(List<T> shardTask);
        String getFinalSummary();
    }

    /**
     * 单个任务执行结果实体
     */
    public static class TaskResult<T> {
        private T task;
        private boolean success;
        private String errorMessage;
        // getter/setter
        public T getTask() { return task; }
        public void setTask(T task) { this.task = task; }
        public boolean isSuccess() { return success; }
        public void setSuccess(boolean success) { this.success = success; }
        public String getErrorMessage() { return errorMessage; }
        public void setErrorMessage(String errorMessage) { this.errorMessage = errorMessage; }
    }

    /**
     * 批量任务汇总结果实体
     */
    public static class BatchResult<T> {
        private int completedCount; // 成功任务数
        private int failedCount;    // 失败任务数
        private List<TaskResult<T>> taskResults; // 所有任务结果

        public BatchResult(int completedCount, int failedCount, List<TaskResult<T>> taskResults) {
            this.completedCount = completedCount;
            this.failedCount = failedCount;
            this.taskResults = taskResults;
        }

        // getter
        public int getCompletedCount() { return completedCount; }
        public int getFailedCount() { return failedCount; }
        public List<TaskResult<T>> getTaskResults() { return taskResults; }

        @Override
        public String toString() {
            return "\n📊 批量任务执行汇总:" +
                    "\n - 总任务数:" + (completedCount + failedCount) +
                    "\n - 成功数:" + completedCount +
                    "\n - 失败数:" + failedCount +
                    "\n - 成功率:" + String.format("%.2f%%", (double) completedCount / (completedCount + failedCount) * 100);
        }
    }
}

四、使用示例:2 种常见场景

场景 1:基础批量任务(无阶段协同)

以下示例模拟批量发送短信任务,无需分阶段,直接并发执行所有任务:

import java.util.Arrays;
import java.util.List;

public class BasicBatchDemo {
    public static void main(String[] args) {
        // 1. 模拟100条短信发送任务(任务数据:手机号)
        List<String> phoneList = Arrays.asList(
                "13800138000", "13900139000", "13700137000",
                // ... 省略97条手机号
                "13600136000"
        );

        // 2. 创建批量任务处理器(CPU密集型任务:核心线程数=CPU核心数+1;IO密集型=2*CPU核心数+1)
        // 短信发送是IO密集型(网络请求),此处配置8线程,分片大小10
        BatchTaskProcessor<String> processor = new BatchTaskProcessor<>(
                8, 10, 60, 10
        );

        // 3. 自定义任务执行逻辑(短信发送)
        BatchTaskProcessor.TaskHandler<String> smsHandler = phone -> {
            // 模拟短信发送逻辑(如调用第三方短信API)
            System.out.printf("发送短信到:%s\n", phone);
            Thread.sleep(100); // 模拟网络请求耗时
            return true; // 执行成功
        };

        // 4. 执行批量任务并获取结果
        try {
            BatchTaskProcessor.BatchResult<String> result = processor.executeBatchTasks(phoneList, smsHandler);
            System.out.println(result);

            // 5. 输出失败任务详情(可选)
            result.getTaskResults().stream()
                    .filter(taskResult -> !taskResult.isSuccess())
                    .forEach(taskResult -> System.out.printf("❌ 失败任务:%s,原因:%s\n",
                            taskResult.getTask(), taskResult.getErrorMessage()));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

场景 2:分阶段批量任务(读取→处理→汇总)

以下示例模拟批量导入用户数据任务,分三阶段执行:读取 Excel 数据→清洗数据→汇总统计:

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class PhasedBatchDemo {
    public static void main(String[] args) {
        // 1. 模拟100条原始用户数据(待导入)
        List<String> userDataList = Arrays.asList(
                "张三,20,13800138000", "李四,25,13900139000",
                // ... 省略98条数据
                "王五,30,13700137000"
        );

        // 2. 创建批量任务处理器(分片大小15)
        BatchTaskProcessor<String> processor = new BatchTaskProcessor<>(
                6, 8, 60, 15
        );

        // 3. 自定义分阶段处理器
        // 第一阶段:读取数据(模拟从Excel读取)
        BatchTaskProcessor.TaskHandler<String> readHandler = data -> {
            System.out.printf("读取数据:%s\n", data);
            return true;
        };

        // 第二阶段:处理数据(模拟数据清洗,去除无效数据)
        BatchTaskProcessor.TaskHandler<String> processHandler = data -> {
            String[] parts = data.split(",");
            if (parts.length != 3) {
                throw new RuntimeException("数据格式错误:" + data);
            }
            System.out.printf("清洗数据:%s→%s(格式校验通过)\n", data, parts[0]);
            return true;
        };

        // 第三阶段:汇总统计(统计成功导入的用户数)
        AtomicInteger totalImported = new AtomicInteger(0);
        BatchTaskProcessor.SummaryHandler<String> summaryHandler = new BatchTaskProcessor.SummaryHandler<String>() {
            @Override
            public void summary(List<String> shardTask) {
                // 统计当前分片的有效数据数
                int validCount = (int) shardTask.stream()
                        .filter(data -> data.split(",").length == 3)
                        .count();
                totalImported.addAndGet(validCount);
            }

            @Override
            public String getFinalSummary() {
                return String.format("\n📊 分阶段任务汇总:成功导入用户数=%d,总处理数据数=%d",
                        totalImported.get(), userDataList.size());
            }
        };

        // 4. 执行分阶段批量任务
        try {
            String finalResult = processor.executePhasedBatchTasks(
                    userDataList, readHandler, processHandler, summaryHandler
            );
            System.out.println(finalResult);
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

五、核心技术解析:线程池与并发工具的协同

线程池:批量任务的“高效执行引擎”

  • 自定义配置适配场景:通过corePoolSizemaxPoolSize等参数,根据任务类型(CPU密集型 / IO密集型)调整线程数,优化资源利用。
  • 任务分片提升并发:将大量任务拆分为小分片,避免单个任务过大导致的线程阻塞,同时减少线程上下文切换开销。
  • 拒绝策略保障稳健:使用CallerRunsPolicy拒绝策略,当任务队列满时,由提交线程自行执行任务,这是一种简单的数据库与中间件任务处理中常用的防丢失策略。

CountDownLatch:单向等待的“同步开关”

  • 用于“主线程等待所有子任务完成”:初始化倒计时次数为分片数,每个分片任务完成后调用countDown(),主线程通过await()阻塞等待,确保汇总结果时所有任务已执行完毕。
  • finally块中调用countDown():确保即使任务执行异常,倒计时也会正常减少,避免主线程无限阻塞。

CyclicBarrier:多阶段协同的“阶段闸门”

  • 用于“分阶段任务协同”:每个阶段所有分片完成后,通过屏障动作(如打印阶段进度)通知,再进入下一阶段,确保阶段执行的有序性。
  • 可重复使用:相比CountDownLatchCyclicBarrier支持多轮同步,完美适配“读取→处理→汇总”等多阶段流程。

线程安全组件:并发环境的数据保障

  • AtomicInteger:原子计数,避免多线程并发修改任务数导致的计数错误。
  • CopyOnWriteArrayList:线程安全的集合,适用于“读多写少”的场景(批量任务中结果读取频率低于写入频率),避免ArrayList在并发环境下的ConcurrentModificationException

六、避坑指南:使用注意事项

  1. 线程池配置需适配任务类型

    • CPU密集型任务(如数据计算):核心线程数设为CPU核心数 + 1。
    • IO密集型任务(如网络请求、文件读写):核心线程数可设为2 * CPU核心数 + 1
    • 任务队列必须用有界队列(如ArrayBlockingQueue),避免无界队列导致的任务堆积和OOM。
  2. 并发工具的正确使用

    • CountDownLatch:确保countDown()调用次数与初始化次数一致,建议在finally块中执行。
    • CyclicBarrier:注意处理BrokenBarrierException(如线程中断、超时导致的屏障损坏)。
    • 避免在屏障动作中执行耗时操作,该动作由最后一个到达的线程执行,耗时操作会阻塞所有线程。
  3. 任务分片大小合理设置

    • 分片过小:导致分片数过多,线程切换频繁,效率下降。
    • 分片过大:单个分片任务执行时间过长,并发优势不明显。
    • 建议:根据任务平均执行时间调整,IO密集型任务的分片大小可适当增大。
  4. 异常处理不可忽视

    • 单个任务执行异常必须捕获,避免单个任务失败导致整个分片任务中断。
    • 记录失败任务详情,方便后续复盘和重试,提升批量任务的可维护性。

七、总结

本文实现的高性能批量任务处理器,其核心在于巧妙组合Java的“线程池”与“并发工具”。线程池解决了“任务高效执行”的问题,而CountDownLatchCyclicBarrier等工具则解决了“任务同步协同”的难题。通过合理的分片策略、线程安全的数据结构和全面的异常处理,该处理器能够稳健地应对数据导入、消息推送等多种批量处理场景,是提升后端服务处理能力的有效实践。




上一篇:Linux RTC驱动框架解析与开发指南:从硬件到驱动实践
下一篇:GPT Researcher v0.3.0实战指南:利用24K星开源AI工具10分钟生成专业报告
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-7 17:12 , Processed in 0.074313 second(s), 36 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表