multithreading - 使用 Spring Batch 多线程 Step 和 RepositoryItemWriter/ RepositoryItemReader 处理大数据
问题描述
我正在尝试使用带有多线程步骤的 Spring Batch 编写批处理应用程序。这是简单的应用程序从表中读取数据并写入另一个表,但数据量很大,大约有 200 万条记录。
我正在使用 RepositoryItemReader 和 RepositoryItemWriter 来读取和写入数据。但是在处理一些数据后,由于无法获取 JDBC 连接而失败。
//Config.Java
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(10);
return taskExecutor;
}
@Bean(name = "personJob")
public Job personKeeperJob() {
Step step = stepBuilderFactory.get("step-1")
.<User, Person> chunk(1000)
.reader(userReader)
.processor(jpaProcessor)
.writer(personWriter)
.taskExecutor(taskExecutor())
.throttleLimit(10)
.build();
Job job = jobBuilderFactory.get("person-job")
.incrementer(new RunIdIncrementer())
.listener(this)
.start(step)
.build();
return job;
}
//Processor.Java
@Override
public Person process(User user) throws Exception {
Optional<User> userFromDb = userRepo.findById(user.getUserId());
Person person = new Person();
if(userFromDb.isPresent()) {
person.setName(userFromDb.get().getName());
person.setUserId(userFromDb.get().getUserId());
person.setDept(userFromDb.get().getDept());
}
return person;
}
//Reader.Java
@Autowired
public UserItemReader(final UserRepository repository) {
super();
this.repository = repository;
}
@PostConstruct
protected void init() {
final Map<String, Sort.Direction> sorts = new HashMap<>();
sorts.put("userId", Direction.ASC);
this.setRepository(this.repository);
this.setSort(sorts);
this.setMethodName("findAll");
}
//Writer.Java
@PostConstruct
protected void init() {
this.setRepository(repository);
}
@Transactional
public void write(List<? extends Person> persons) throws Exception {
repository.saveAll(persons);
}
application.properties
# Datasource
spring.datasource.platform=h2
spring.datasource.url=jdbc:h2:mem:batchdb
spring.main.allow-bean-definition-overriding=true
spring.datasource.hikari.maximum-pool-size=500
Error :
org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection
at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:447)
......................
Caused by: org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection
at org.hibernate.exception.internal.SQLExceptionTypeDelegate.convert(SQLExceptionTypeDelegate.java:48)
............................
Caused by: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30927ms.
解决方案
你的连接用完了。
尝试将 Hikari Connection Pool 设置为更大的数字:
spring.datasource.hikari.maximum-pool-size=20
推荐阅读
- react-native - 无法在生产中打开我的应用程序 | DeepLink React Native | 反应导航 v5/v6
- javascript - 根据 id 动态设置 html 元素样式
- angular - 如何导出地图以在其他组件中重复使用?
- ios - 由于未捕获的异常“NSInternalInconsistencyException”而终止应用程序,原因:Flutter 中的“标准消息损坏”
- spring-kafka - spring kafka ConcurrentMessageListenerContainer consumerr中的线程名称约定
- react-native - 我怎样才能在 false 上设置模式可见?
- c++ - 我们可以使用 node.js 子进程从服务器运行 openGL 项目吗?
- google-analytics - 如何删除谷歌标签管理器容器?
- react-native - 键盘避免视图无法与模态一起正常工作
- python - TypeError: cannot unpack non-iterable NoneType object 。它给出了随机循环中的错误