首页 > 解决方案 > 使用 Spring Batch 多线程 Step 和 RepositoryItemWriter/ RepositoryItemReader 处理大数据

问题描述

我正在尝试使用带有多线程步骤的 Spring Batch 编写批处理应用程序。这是简单的应用程序从表中读取数据并写入另一个表,但数据量很大,大约有 200 万条记录。

我正在使用 RepositoryItemReader 和 RepositoryItemWriter 来读取和写入数据。但是在处理一些数据后,由于无法获取 JDBC 连接而失败。

//Config.Java

 @Bean
    public TaskExecutor taskExecutor() {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(10);
        return taskExecutor;
    }

    @Bean(name = "personJob")
    public Job personKeeperJob() {

        Step step = stepBuilderFactory.get("step-1")
                .<User, Person> chunk(1000)
                .reader(userReader)
                .processor(jpaProcessor)
                .writer(personWriter)
                .taskExecutor(taskExecutor())
                .throttleLimit(10)
                .build();

        Job job = jobBuilderFactory.get("person-job")
                .incrementer(new RunIdIncrementer())
                .listener(this)
                .start(step)
                .build();

        return job;
    }


//Processor.Java

@Override
    public Person process(User user) throws Exception {
        Optional<User> userFromDb = userRepo.findById(user.getUserId());
        Person person = new Person();

        if(userFromDb.isPresent()) {
            person.setName(userFromDb.get().getName());
            person.setUserId(userFromDb.get().getUserId());
            person.setDept(userFromDb.get().getDept());
        }

        return person;
    }


//Reader.Java

@Autowired
    public UserItemReader(final UserRepository repository) {
        super();
        this.repository = repository;
    }
    @PostConstruct
    protected void init() {
        final Map<String, Sort.Direction> sorts = new HashMap<>();
        sorts.put("userId", Direction.ASC);
        this.setRepository(this.repository);
        this.setSort(sorts);
        this.setMethodName("findAll");
    }

//Writer.Java

@PostConstruct
    protected void init() {
        this.setRepository(repository);
    }

    @Transactional
    public void write(List<? extends Person> persons) throws Exception {
        repository.saveAll(persons);
    }


application.properties

# Datasource

spring.datasource.platform=h2
spring.datasource.url=jdbc:h2:mem:batchdb
spring.main.allow-bean-definition-overriding=true
spring.datasource.hikari.maximum-pool-size=500

Error :

org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection
    at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:447)
  ......................
Caused by: org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection
    at org.hibernate.exception.internal.SQLExceptionTypeDelegate.convert(SQLExceptionTypeDelegate.java:48)

............................

Caused by: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30927ms.

标签: multithreadinghibernatespring-batch

解决方案


你的连接用完了。

尝试将 Hikari Connection Pool 设置为更大的数字:

spring.datasource.hikari.maximum-pool-size=20

推荐阅读