上周接手了一个数据迁移任务,需要将10万条数据从旧系统迁移到新系统。写了一个简单的批量插入程序,一运行,结果令人咋舌:整整花了5分钟。领导要求提速,于是花了一个下午进行优化,最终将时间压缩到了3秒左右。以下就是这次优化的完整记录。
最初的方案:循环单条插入(耗时5分钟)
最初的实现非常简单粗暴,就是遍历集合并逐条插入:
// 方式1:循环单条插入(最慢)
for (User user : userList) {
userMapper.insert(user);
}
处理10万条数据时,每条记录都需要经历一次独立的网络请求、SQL解析和事务提交。假设每条插入耗时3毫秒,10万条的总时间就是300秒,也就是5分钟。
这实际上是典型的性能反模式,但在许多项目中依然能见到它的身影。
第一次优化:使用批量SQL语句(耗时30秒)
首要的优化方向是将多次单条插入合并为一条批量插入SQL。
首先,在Mapper XML文件中定义批量插入方法:
<!-- Mapper.xml -->
<insert id="batchInsert">
INSERT INTO user (name, age, email) VALUES
<foreach collection="list" item="item" separator=",">
(#{item.name}, #{item.age}, #{item.email})
</foreach>
</insert>
然后在业务逻辑中,将数据分批进行插入:
// 分批插入,每批1000条
int batchSize = 1000;
for (int i = 0; i < userList.size(); i += batchSize) {
int end = Math.min(i + batchSize, userList.size());
List<User> batch = userList.subList(i, end);
userMapper.batchInsert(batch);
}
经过这一优化,耗时从5分钟大幅降低到30秒,性能提升了10倍。
优化原理:将多条INSERT语句合并为一条,显著减少了与数据库的网络通信次数和SQL解析开销。
虽然提升显著,但30秒对于需要频繁进行数据同步或迁移的场景来说,仍然不够理想。
第二次优化:启用JDBC批处理(耗时8秒)
更进一步的优化是利用JDBC的批处理能力。这里的关键是MySQL连接参数 rewriteBatchedStatements。
第一步:修改数据库连接URL
在JDBC连接字符串中启用批处理重写:
jdbc:mysql://localhost:3306/test?rewriteBatchedStatements=true
第二步:使用MyBatis的批处理执行器
通过SqlSessionFactory开启批处理模式的会话:
@Autowired
private SqlSessionFactory sqlSessionFactory;
public void batchInsertWithExecutor(List<User> userList) {
try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH)) {
UserMapper mapper = sqlSession.getMapper(UserMapper.class);
int batchSize = 1000;
for (int i = 0; i < userList.size(); i++) {
mapper.insert(userList.get(i));
if ((i + 1) % batchSize == 0) {
sqlSession.flushStatements();
sqlSession.clearCache();
}
}
sqlSession.flushStatements();
sqlSession.commit();
}
}
这次优化将耗时从30秒降低到了8秒。
优化原理:在ExecutorType.BATCH模式下,MyBatis会缓存预编译的SQL语句,最后一次性发送给数据库执行。配合rewriteBatchedStatements=true参数,MySQL驱动会在底层将多条INSERT语句合并,极大提升效率。这是MyBatis框架结合JDBC批处理的经典优化手段。
第三次优化:引入多线程并行处理(耗时3秒)
当单线程批处理达到瓶颈后,可以尝试利用多核CPU的优势进行并行插入。需要注意的是,此方案牺牲了事务的强一致性,适用于允许最终一致性的场景。
public void parallelBatchInsert(List<User> userList) {
int threadCount = 4; // 根据数据库连接池大小调整
int batchSize = userList.size() / threadCount;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
int start = i * batchSize;
int end = (i == threadCount - 1) ? userList.size() : (i + 1) * batchSize;
List<User> subList = userList.subList(start, end);
futures.add(executor.submit(() -> {
batchInsertWithExecutor(subList);
}));
}
// 等待所有任务完成
for (Future<?> future : futures) {
try {
future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
executor.shutdown();
}
通过并行处理,最终耗时从8秒进一步降低到了3秒。
实施多线程方案时的注意事项:
- 线程数控制:工作线程数不应超过数据库连接池的最大连接数,否则会出现连接等待,反而降低性能。
- 事务一致性:此方案下,每个线程使用独立的事务,无法保证所有数据要么全部成功要么全部失败。如果业务要求强一致性,则不适合此方案。
- 主键冲突:如果使用数据库自增主键,并行插入时需确保主键生成策略(如步长设置)能避免冲突。
优化效果对比
| 方案 |
耗时 |
提升倍数 |
| 循环单条插入 |
300秒 |
基准 |
| 批量SQL |
30秒 |
10倍 |
| JDBC批处理 |
8秒 |
37倍 |
| 多线程并行 |
3秒 |
100倍 |
实践过程中遇到的“坑”与解决方案
坑1:foreach拼接的SQL语句过长
在Mapper中使用<foreach>标签时,如果单次插入的数据量过大(例如数万条),生成的SQL语句会非常长,可能超过MySQL服务器max_allowed_packet参数的限制,导致执行失败。
解决方案:务必进行分批插入,建议每批500-1000条记录。
坑2:rewriteBatchedStatements参数未生效
配置了参数但性能没有提升?请检查以下几点:
- 连接参数:确保JDBC URL中正确添加了
rewriteBatchedStatements=true。
- 执行器类型:代码中必须使用
SqlSessionFactory.openSession(ExecutorType.BATCH)开启批处理会话。
- 驱动版本:使用较新版本的MySQL JDBC驱动(如8.0.x),旧版本可能对该特性支持不佳。
坑3:批量插入时的自增主键返回问题
在批量插入场景下,如果需要获取每条记录的自增ID,通常会在insert标签中配置useGeneratedKeys和keyProperty。
<insert id="batchInsert" useGeneratedKeys="true" keyProperty="id">
但需要注意的是,在启用rewriteBatchedStatements=true后,自增主键的返回行为在早期驱动版本中可能异常。建议升级MySQL驱动至8.0.17及以上版本以获得更好的支持。
坑4:内存溢出(OOM)风险
一次性将10万条数据全部加载到内存中进行处理,可能会对应用堆内存造成巨大压力,存在OOM风险。
解决方案:采用分页读取和处理的模式,即“读取一批,处理一批,清空一批”。
int pageSize = 10000;
int total = countTotal();
for (int i = 0; i < total; i += pageSize) {
List<User> page = selectByPage(i, pageSize);
batchInsertWithExecutor(page);
}
最终方案代码整合
以下是将上述优化点整合后的一个完整服务类示例:
@Service
public class BatchInsertService {
@Autowired
private SqlSessionFactory sqlSessionFactory;
/**
* 高性能批量插入
* 10万条数据约3秒
*/
public void highPerformanceBatchInsert(List<User> userList) {
if (userList == null || userList.isEmpty()) {
return;
}
int threadCount = Math.min(4, Runtime.getRuntime().availableProcessors());
int batchSize = (int) Math.ceil((double) userList.size() / threadCount);
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
int start = i * batchSize;
int end = Math.min((i + 1) * batchSize, userList.size());
if (start >= userList.size()) {
latch.countDown();
continue;
}
List<User> subList = new ArrayList<>(userList.subList(start, end));
executor.submit(() -> {
try {
doBatchInsert(subList);
} finally {
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
executor.shutdown();
}
private void doBatchInsert(List<User> userList) {
try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
UserMapper mapper = sqlSession.getMapper(UserMapper.class);
for (int i = 0; i < userList.size(); i++) {
mapper.insert(userList.get(i));
if ((i + 1) % 1000 == 0) {
sqlSession.flushStatements();
sqlSession.clearCache();
}
}
sqlSession.flushStatements();
sqlSession.commit();
}
}
}
总结
| 优化点 |
关键配置/操作 |
| 批量SQL |
Mapper中使用<foreach>拼接,代码中分批(如1000条/批)提交 |
| JDBC批处理 |
JDBC URL添加rewriteBatchedStatements=true,结合ExecutorType.BATCH |
| 多线程并行 |
根据连接池大小合理设置线程数,使用CountDownLatch同步 |
性能优化的核心原则可以归结为三点:
- 减少网络往返:通过批量SQL将多次请求合并。
- 减少事务开销:利用JDBC批处理合并数据库端操作。
- 并行处理:在资源允许的前提下,利用多线程并发执行。
从最初的5分钟到最终的3秒,这次优化过程清晰地展示了不同技术手段带来的性能差异。希望这份实战记录能为你处理大数据量插入时提供有价值的参考。在实际开发中,还需要根据具体的数据量、数据库配置和硬件资源进行调整和测试。欢迎在云栈社区分享你的实践经验和遇到的问题。
来源:https://blog.csdn.net/zhangxianhau/article/details/156057385