找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1618

积分

0

好友

206

主题
发表于 2026-2-14 03:03:10 | 查看: 30| 回复: 0

前言

Spring Batch 是一个基于 Spring 的成熟、轻量级批处理框架,专为大规模数据处理场景设计。它不是定时调度框架(如 Quartz),但可与之协同;其核心价值在于提供可控执行、事务粒度管理、失败重试、跳过容错、进度追踪与状态持久化等企业级能力。

为什么说它“方便使用”?——深度集成 Spring 生态,配置即编码,组件职责清晰,学习曲线平缓。
为什么说它“较健全”?——开箱即用的 JobRepositoryJobLauncherStepItemReader/Processor/Writer 等抽象,覆盖了批处理全生命周期的关键痛点。

本文将结合真实业务场景,手把手带你完成两个典型任务:

  • ✅ 从 bloginfo.csv 文件批量读取 8 万余条数据 → 业务逻辑处理 → 写入 MySQL 表  
  • ✅ 从数据库 bloginfo 表按条件查询 → 动态参数化处理 → 写入新表 bloginfonew

所有代码均基于 Spring Boot 2.7.x + MyBatis + HikariCP,兼容主流生产环境,拒绝“玩具示例”。

💡 提示:文中涉及的 druid 连接池与 MyBatisCursorItemReader 存在兼容性问题(详见后文排障章节),我们已给出可落地的 HikariCP 替代方案。

Spring Batch 核心组件流程图:左侧 JobRepository(存储)、JobLauncher(启动器)、Job(任务);右侧 Step(步骤)含 ItemReader(读取)、ItemProcessor(处理)、ItemWriter(写入),箭头标注“循环逐条执行”与“达到设置值后事务写入”

数据准备:建表与样例数据

首先创建用于存储和读取的数据库表 bloginfo

CREATE TABLE `bloginfo` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `blogAuthor` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客作者标识',
  `blogUrl` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客链接',
  `blogTitle` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客标题',
  `blogItem` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客栏目',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 89031 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

同时准备一份 bloginfo.csv 文件(约 8 万行),结构如下(Excel 截图示意):

Excel 表格截图:A 列为序号,B 列为 URL(http://testAddData.com),C 列为“测试数据”,D 列为“springboot”,右下角标注“8万多条测试数据”

项目依赖与基础配置

Maven 依赖(pom.xml)

确保引入以下核心依赖(注意版本兼容性):

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
</dependency>

<!-- Spring Batch 核心 -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

<!-- Hibernate Validator(用于数据校验) -->
<dependency>
  <groupId>org.hibernate</groupId>
  <artifactId>hibernate-validator</artifactId>
  <version>6.0.7.Final</version>
</dependency>

<!-- MyBatis 整合 -->
<dependency>
  <groupId>org.mybatis.spring.boot</groupId>
  <artifactId>mybatis-spring-boot-starter</artifactId>
  <version>2.0.0</version>
</dependency>

<!-- MySQL 驱动 -->
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <scope>runtime</scope>
</dependency>

<!-- ⚠️ 注意:druid 在 MyBatisCursorItemReader 场景下存在兼容问题,此处暂不启用 -->
<!-- 
<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>druid-spring-boot-starter</artifactId>
  <version>1.1.18</version>
</dependency>
-->

Spring Boot 配置(application.yml)

关键点说明:

  • spring.batch.job.enabled: false:禁用自动启动 Job,由我们通过 JobLauncher.run() 显式触发;
  • spring.batch.initialize-schema: always:每次启动自动初始化 Batch 元数据表(如 batch_job_execution);
  • 数据源使用 Spring Boot 2.x 默认推荐的 HikariCP(无需额外依赖,开箱即用);
spring:
  batch:
    job:
      enabled: false
      initialize-schema: always
      # table-prefix: my-batch  # 如需自定义前缀可取消注释

  datasource:
    username: root
    password: root
    url: jdbc:mysql://localhost:3306/hellodemo?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull
    driver-class-name: com.mysql.cj.jdbc.Driver
    # HikariCP 默认配置已足够,无需显式声明;如需调优可参考:
    # hikari:
    #   connection-timeout: 30000
    #   maximum-pool-size: 20

server:
  port: 8665

YAML 配置截图:突出显示 `job.enabled: false` 和 `initialize-schema: always`,右侧中文注释说明作用

核心实体与 Mapper 层

POJO:BlogInfo.java

/**
 * @Author : JCccc
 * @Description :
 **/
public class BlogInfo {
    private Integer id;
    private String blogAuthor;
    private String blogUrl;
    private String blogTitle;
    private String blogItem;

    @Override
    public String toString() {
        return "BlogInfo{" +
                "id=" + id +
                ", blogAuthor='" + blogAuthor + '\'' +
                ", blogUrl='" + blogUrl + '\'' +
                ", blogTitle='" + blogTitle + '\'' +
                ", blogItem='" + blogItem + '\'' +
                '}';
    }

    public Integer getId() { return id; }
    public void setId(Integer id) { this.id = id; }

    public String getBlogAuthor() { return blogAuthor; }
    public void setBlogAuthor(String blogAuthor) { this.blogAuthor = blogAuthor; }

    public String getBlogUrl() { return blogUrl; }
    public void setBlogUrl(String blogUrl) { this.blogUrl = blogUrl; }

    public String getBlogTitle() { return blogTitle; }
    public void setBlogTitle(String blogTitle) { this.blogTitle = blogTitle; }

    public String getBlogItem() { return blogItem; }
    public void setBlogItem(String blogItem) { this.blogItem = blogItem; }
}

MyBatis Mapper 接口(注解方式)

import com.example.batchdemo.pojo.BlogInfo;
import org.apache.ibatis.annotations.*;
import java.util.List;
import java.util.Map;

/**
 * @Author : JCccc
 * @Description :
 **/
@Mapper
public interface BlogMapper {
    @Insert("INSERT INTO bloginfo ( blogAuthor, blogUrl, blogTitle, blogItem ) VALUES ( #{blogAuthor}, #{blogUrl}, #{blogTitle}, #{blogItem}) ")
    @Options(useGeneratedKeys = true, keyProperty = "id")
    int insert(BlogInfo bloginfo);

    @Select("select blogAuthor, blogUrl, blogTitle, blogItem from bloginfo where blogAuthor < #{authorId}")
    List<BlogInfo> queryInfoById(Map<String, Integer> map);
}

Spring Batch 核心配置类(MyBatchConfig.java)

使用 @Configuration + @EnableBatchProcessing 启用批处理支持:

@Configuration
@EnableBatchProcessing
public class MyBatchConfig {
    private Logger logger = LoggerFactory.getLogger(MyBatchConfig.class);

    // ======== JobRepository:Job 元数据存储与事务管理 ========
    @Bean
    public JobRepository myJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDatabaseType("mysql");
        jobRepositoryFactoryBean.setTransactionManager(transactionManager);
        jobRepositoryFactoryBean.setDataSource(dataSource);
        return jobRepositoryFactoryBean.getObject();
    }

    // ======== JobLauncher:Job 执行入口 ========
    @Bean
    public SimpleJobLauncher myJobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(myJobRepository(dataSource, transactionManager));
        return jobLauncher;
    }

    // ======== Job:定义一个完整批处理任务 ========
    @Bean
    public Job myJob(JobBuilderFactory jobs, Step myStep) {
        return jobs.get("myJob")
                .incrementer(new RunIdIncrementer())
                .flow(myStep)
                .end()
                .listener(myJobListener())
                .build();
    }

    // ======== Job 监听器 ========
    @Bean
    public MyJobListener myJobListener() {
        return new MyJobListener();
    }

    // ======== Step:核心执行单元,串联 Reader → Processor → Writer ========
    @Bean
    public Step myStep(StepBuilderFactory stepBuilderFactory,
                       ItemReader<BlogInfo> reader,
                       ItemWriter<BlogInfo> writer,
                       ItemProcessor<BlogInfo, BlogInfo> processor) {
        return stepBuilderFactory
                .get("myStep")
                .<BlogInfo, BlogInfo>chunk(6500) // 每次处理 6500 条后提交事务
                .reader(reader)
                    .faultTolerant()
                    .retryLimit(3).retry(Exception.class)
                    .skip(Exception.class).skipLimit(2)
                    .listener(new MyReadListener())
                .processor(processor)
                .writer(writer)
                    .faultTolerant()
                    .skip(Exception.class).skipLimit(2)
                    .listener(new MyWriteListener())
                .build();
    }
}

Java 配置类截图:高亮 `@Configuration` 和 `@EnableBatchProcessing` 注解,右侧中文注释“开启批处理的支持”

场景一:CSV 文件批处理(FlatFileItemReader)

ItemReader:读取 CSV 并映射为 BlogInfo

@Bean
public ItemReader<BlogInfo> reader() {
    FlatFileItemReader<BlogInfo> reader = new FlatFileItemReader<>();
    // 指定 classpath 下文件路径
    reader.setResource(new ClassPathResource("static/bloginfo.csv"));
    // 定义字段映射规则
    reader.setLineMapper(new DefaultLineMapper<BlogInfo>() {{
        setLineTokenizer(new DelimitedLineTokenizer() {{
            setNames(new String[]{"blogAuthor", "blogUrl", "blogTitle", "blogItem"});
        }});
        setFieldSetMapper(new BeanWrapperFieldSetMapper<BlogInfo>() {{
            setTargetType(BlogInfo.class);
        }});
    }});
    return reader;
}

代码截图:高亮 `setResource(...)` 和 `setNames(...)`,右侧中文注释“资源路径”与“字段名称数组”

ItemProcessor:简单业务处理(字符串替换)

@Bean
public ItemProcessor<BlogInfo, BlogInfo> processor() {
    MyItemProcessor myItemProcessor = new MyItemProcessor();
    myItemProcessor.setValidator(myBeanValidator());
    return myItemProcessor;
}

// 自定义处理器逻辑
public class MyItemProcessor extends ValidatingItemProcessor<BlogInfo> {
    @Override
    public BlogInfo process(BlogInfo item) throws ValidationException {
        super.process(item); // 触发校验器
        if ("springboot".equals(item.getBlogItem())) {
            item.setBlogTitle("springboot 系列还请看看我Jc");
        } else {
            item.setBlogTitle("未知系列");
        }
        return item;
    }
}

ItemWriter:批量插入数据库(JdbcBatchItemWriter)

@Bean
public ItemWriter<BlogInfo> writer(DataSource dataSource) {
    JdbcBatchItemWriter<BlogInfo> writer = new JdbcBatchItemWriter<>();
    writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
    String sql = "insert into bloginfo " +
                 "(blogAuthor,blogUrl,blogTitle,blogItem) " +
                 "values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)";
    writer.setSql(sql);
    writer.setDataSource(dataSource);
    return writer;
}

代码截图:高亮 SQL 拼接与 `setDataSource()`,右侧中文注释“插入数据sql的编写”

场景二:数据库查询批处理(MyBatisCursorItemReader)

⚠️ 关键避坑:MyBatisCursorItemReader 与 Druid 连接池存在 SQLFeatureNotSupportedException 兼容问题(见后文日志)。解决方案:切换至 HikariCP(Spring Boot 2.x 默认)。

新增 Job 与 Step(复用大部分已有组件)

@Bean
public Job myJobNew(JobBuilderFactory jobs, Step stepNew) {
    return jobs.get("myJobNew")
            .incrementer(new RunIdIncrementer())
            .flow(stepNew)
            .end()
            .listener(myJobListener())
            .build();
}

@Bean
public Step stepNew(StepBuilderFactory stepBuilderFactory,
                    MyBatisCursorItemReader<BlogInfo> itemReaderNew,
                    ItemWriter<BlogInfo> writerNew,
                    ItemProcessor<BlogInfo, BlogInfo> processorNew) {
    return stepBuilderFactory
            .get("stepNew")
            .<BlogInfo, BlogInfo>chunk(6500)
            .reader(itemReaderNew)
                .faultTolerant()
                .retryLimit(3).retry(Exception.class)
                .skip(Exception.class).skipLimit(10)
                .listener(new MyReadListener())
            .processor(processorNew)
            .writer(writerNew)
                .faultTolerant()
                .skip(Exception.class).skipLimit(2)
                .listener(new MyWriteListener())
            .build();
}

ItemReader:动态参数化数据库查询

@Autowired
private SqlSessionFactory sqlSessionFactory;

@Bean
@StepScope // 必须!使 JobParameters 参数在 Step 启动时注入
public MyBatisCursorItemReader<BlogInfo> itemReaderNew(@Value("#{jobParameters[authorId]}") String authorId) {
    System.out.println("开始查询数据库");
    MyBatisCursorItemReader<BlogInfo> reader = new MyBatisCursorItemReader<>();
    reader.setQueryId("com.example.batchdemo.mapper.BlogMapper.queryInfoById");
    reader.setSqlSessionFactory(sqlSessionFactory);
    Map<String, Object> map = new HashMap<>();
    map.put("authorId", Integer.valueOf(authorId));
    reader.setParameterValues(map);
    return reader;
}

代码截图:高亮 `@StepScope` 和 `@Value("#{jobParameters[authorId]}")`,右侧中文注释“从job处传参,实现动态SQL”

对应 Mapper 方法:

@Select("select blogAuthor, blogUrl, blogTitle, blogItem from bloginfo where blogAuthor < #{authorId}")
List<BlogInfo> queryInfoById(Map<String, Integer> map);

Mapper 方法截图:高亮 `#{authorId}` 占位符

ItemProcessor:按 authorId 分段处理标题

@Bean
public ItemProcessor<BlogInfo, BlogInfo> processorNew() {
    MyItemProcessorNew csvItemProcessor = new MyItemProcessorNew();
    csvItemProcessor.setValidator(myBeanValidator());
    return csvItemProcessor;
}

public class MyItemProcessorNew extends ValidatingItemProcessor<BlogInfo> {
    @Override
    public BlogInfo process(BlogInfo item) throws ValidationException {
        super.process(item);
        Integer authorId = Integer.valueOf(item.getBlogAuthor());
        if (authorId < 20000) {
            item.setBlogTitle("这是都是小于20000的数据");
        } else if (authorId >= 20000 && authorId < 30000) {
            item.setBlogTitle("这是都是小于30000但是大于20000的数据");
        } else {
            item.setBlogTitle("旧书不厌百回读");
        }
        return item;
    }
}

代码截图:高亮 `authorId &lt; 20000` 分支逻辑,右侧中文注释“新处理场景的模拟逻辑”

ItemWriter:写入新表 bloginfonew

@Bean
public ItemWriter<BlogInfo> writerNew(DataSource dataSource) {
    JdbcBatchItemWriter<BlogInfo> writer = new JdbcBatchItemWriter<>();
    writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
    String sql = "insert into bloginfonew " +
                 "(blogAuthor,blogUrl,blogTitle,blogItem) " +
                 "values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)";
    writer.setSql(sql);
    writer.setDataSource(dataSource);
    return writer;
}

代码截图:高亮表名 `bloginfonew` 和 SQL 拼接

启动与调用接口

Controller 触发 Job

@RestController
public class TestController {

    @Autowired
    SimpleJobLauncher jobLauncher;

    @Autowired
    Job myJob;

    @Autowired
    Job myJobNew;

    @GetMapping("testJob")
    public void testJob() throws JobParametersInvalidException, JobExecutionException {
        JobParameters jobParameters = new JobParametersBuilder().toJobParameters();
        jobLauncher.run(myJob, jobParameters);
    }

    @GetMapping("testJobNew")
    public void testJobNew(@RequestParam("authorId") String authorId)
            throws JobParametersInvalidException, JobExecutionException {
        JobParameters jobParametersNew = new JobParametersBuilder()
                .addLong("timeNew", System.currentTimeMillis())
                .addString("authorId", authorId)
                .toJobParameters();
        jobLauncher.run(myJobNew, jobParametersNew);
    }
}

接口调用截图:GET http://localhost:8665/testJobNew?authorId=12000,Query 参数页显示 authorId=12000

启动验证与结果查看

  1. 启动应用后,观察控制台日志,确认 batch_* 元数据表已自动创建:
    数据库表列表截图:显示 `batch_job_execution`、`batch_step_execution` 等表,以及 `bloginfo` 表

  2. 调用 /testJob,查看日志是否输出 COMPLETED
    终端日志截图:包含 `[COMPLETED]` 状态及 `Job [FlowJob] launched...` 信息

  3. 查看 bloginfo 表数据是否按预期处理并写入:
    数据库查询结果截图:`blogTitle` 字段已更新为 “springboot 系列还请看看我Jc”

  4. 调用 /testJobNew?authorId=12000,验证数据库查询场景:

    • ✅ 成功时:日志显示 [COMPLETED]bloginfonew 表中数据按 authorId 分段生成标题;
    • ❌ 失败时(若误用 Druid):报 java.sql.SQLFeatureNotSupportedException(见下图);
      错误日志截图:红色高亮 `SQLFeatureNotSupportedException` 及 `[FAILED]` 状态

关键排障:Druid 兼容性问题与 HikariCP 方案

问题现象

当使用 MyBatisCursorItemReader 时,若连接池为 Druid(druid-spring-boot-starter),会抛出:

Caused by: java.sql.SQLFeatureNotSupportedException: null
at com.alibaba.druid.pool.DruidPooledStatement.executeQuery(...)

根本原因

MyBatisCursorItemReader 底层调用 SqlSession.selectCursor(),依赖 JDBC ResultSet.TYPE_FORWARD_ONLY + CONCUR_READ_ONLY 游标类型,而 Druid 对该组合支持不完善。

解决方案(推荐)

移除 Druid 依赖,使用 Spring Boot 2.x 默认 HikariCP(性能更优、官方首选):

  1. 删除 pom.xml 中 Druid 依赖;
  2. application.yml 中删除 datasource.druid.* 配置;
  3. 保持 spring.datasource.* 基础配置即可(HikariCP 自动装配)。

📌 官方文档明确指出:“We prefer HikariCP for its performance and concurrency. If HikariCP is available, we always choose it.”

如需自定义 HikariCP 参数,可添加:

spring:
  datasource:
    hikari:
      connection-timeout: 30000
      maximum-pool-size: 20
      minimum-idle: 5

HikariCP 配置项截图:列出 `connection-test-query`、`connection-timeout`、`max-lifetime` 等参数

总结与延伸

本文完整实现了 Spring Batch 在 Spring Boot 2.x 环境下的两大核心场景:

  • 文件驱动批处理FlatFileItemReader + CSV → 业务处理 → DB 写入;
  • 数据库驱动批处理MyBatisCursorItemReader + 动态参数 → 分段处理 → 新表写入;

同时解决了生产环境中常见的 Druid 兼容性陷阱,给出了经过验证的 HikariCP 替代方案,保障稳定性与性能。

Spring Batch 的强大不仅在于功能丰富,更在于其可扩展性

  • 通过 @StepScope 实现 Step 级别 Bean 生命周期与参数注入;
  • 通过 faultTolerant() 配置 retry/skip 策略应对脏数据;
  • 通过 Chunk 机制精细控制事务边界与内存占用;
  • 通过 JobExecutionListener/ItemReadListener 等监听器实现可观测性。

如需进一步深入,可参考云栈社区的 Java 技术板块,获取更多关于 Spring Boot数据库/中间件开源实战 的系统化内容。




上一篇:Mac本地搭建AI编程助手:Ollama运行DeepSeek模型,配置AnythingLLM与VSCode Continue
下一篇:Spring异步上传图片实战:多线程处理水印与缩略图优化响应
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-2-23 15:21 , Processed in 0.428491 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表