
上周接了个数据迁移的活,要把10万条数据从老系统导入新系统。
写了个简单的批量插入,跑起来一看——5分钟。
领导说太慢了,能不能快点?
折腾了一下午,最后优化到3秒,记录一下过程。
最开始写的很简单,foreach循环插入:
// 方式1:循环单条插入(最慢)
for (User user : userList) {
userMapper.insert(user);
}10万条数据,每条都要走一次网络请求、一次SQL解析、一次事务提交。
算一下:假设每条插入需要3ms,10万条就是300秒 = 5分钟。
这是最蠢的写法,但我见过很多项目都这么写。
把循环插入改成批量SQL:
<!-- Mapper.xml -->
<insert id="batchInsert">
INSERT INTO user (name, age, email) VALUES
<foreach collection="list" item="item" separator=",">
(#{item.name}, #{item.age}, #{item.email})
</foreach>
</insert>
// 分批插入,每批1000条
int batchSize = 1000;
for (int i = 0; i < userList.size(); i += batchSize) {
int end = Math.min(i + batchSize, userList.size());
List<User> batch = userList.subList(i, end);
userMapper.batchInsert(batch);
}从5分钟降到30秒,提升10倍。
“原理:一条SQL插入多条数据,减少网络往返次数。
但还有问题:30秒还是太慢。
MySQL有个参数叫rewriteBatchedStatements,开启后可以把多条INSERT合并成一条。
jdbc:mysql://localhost:3306/test?rewriteBatchedStatements=true@Autowired
private SqlSessionFactory sqlSessionFactory;
public void batchInsertWithExecutor(List<User> userList) {
try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH)) {
UserMapper mapper = sqlSession.getMapper(UserMapper.class);
int batchSize = 1000;
for (int i = 0; i < userList.size(); i++) {
mapper.insert(userList.get(i));
if ((i + 1) % batchSize == 0) {
sqlSession.flushStatements();
sqlSession.clearCache();
}
}
sqlSession.flushStatements();
sqlSession.commit();
}
}从30秒降到8秒。
“原理:
ExecutorType.BATCH模式下,MyBatis会缓存SQL,最后一次性发送给数据库执行。配合rewriteBatchedStatements=true,MySQL驱动会把多条INSERT合并。
8秒还是不够快,上多线程:
public void parallelBatchInsert(List<User> userList) {
int threadCount = 4; // 根据数据库连接池大小调整
int batchSize = userList.size() / threadCount;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
int start = i * batchSize;
int end = (i == threadCount - 1) ? userList.size() : (i + 1) * batchSize;
List<User> subList = userList.subList(start, end);
futures.add(executor.submit(() -> {
batchInsertWithExecutor(subList);
}));
}
// 等待所有任务完成
for (Future<?> future : futures) {
try {
future.get();
} catch (Exception e) {
thrownew RuntimeException(e);
}
}
executor.shutdown();
}从8秒降到3秒。
注意事项:
方案 | 耗时 | 提升倍数 |
|---|---|---|
循环单条插入 | 300秒 | 基准 |
批量SQL | 30秒 | 10倍 |
JDBC批处理 | 8秒 | 37倍 |
多线程并行 | 3秒 | 100倍 |
<foreach collection="list" item="item" separator=",">如果一次插入太多条,SQL会非常长,可能超过max_allowed_packet限制。
解决: 分批插入,每批500-1000条。
检查几个点:
rewriteBatchedStatements=trueExecutorType.BATCH批量插入时想获取自增主键:
<insert id="batchInsert" useGeneratedKeys="true" keyProperty="id">注意:rewriteBatchedStatements=true时,自增主键返回可能有问题,需要升级MySQL驱动到8.0.17+。
10万条数据一次性加载到内存,可能OOM。
解决:分页读取 + 分批插入。
int pageSize = 10000;
int total = countTotal();
for (int i = 0; i < total; i += pageSize) {
List<User> page = selectByPage(i, pageSize);
batchInsertWithExecutor(page);
}@Service
publicclass BatchInsertService {
@Autowired
private SqlSessionFactory sqlSessionFactory;
/**
* 高性能批量插入
* 10万条数据约3秒
*/
public void highPerformanceBatchInsert(List<User> userList) {
if (userList == null || userList.isEmpty()) {
return;
}
int threadCount = Math.min(4, Runtime.getRuntime().availableProcessors());
int batchSize = (int) Math.ceil((double) userList.size() / threadCount);
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
int start = i * batchSize;
int end = Math.min((i + 1) * batchSize, userList.size());
if (start >= userList.size()) {
latch.countDown();
continue;
}
List<User> subList = new ArrayList<>(userList.subList(start, end));
executor.submit(() -> {
try {
doBatchInsert(subList);
} finally {
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
executor.shutdown();
}
private void doBatchInsert(List<User> userList) {
try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
UserMapper mapper = sqlSession.getMapper(UserMapper.class);
for (int i = 0; i < userList.size(); i++) {
mapper.insert(userList.get(i));
if ((i + 1) % 1000 == 0) {
sqlSession.flushStatements();
sqlSession.clearCache();
}
}
sqlSession.flushStatements();
sqlSession.commit();
}
}
}优化点 | 关键配置 |
|---|---|
批量SQL | foreach拼接,分批1000条 |
JDBC批处理 | rewriteBatchedStatements=true+ExecutorType.BATCH |
多线程 | 线程数 ≤ 连接池大小 |
核心原则: 减少网络往返 + 减少事务次数 + 并行处理。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 [email protected] 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 [email protected] 删除。