首页 > 解决方案 > Spring批处理:多线程步骤错误配置

问题描述

这是我的步骤:

@Bean
public Step autors(
    ItemReader<Autor> autorItemReader,
    AutorMappingItemProcessor processor,
    AutorPipeliningItemWriter unitatPipeliningWriter
) {
    return this.stepBuilderFactory
        .get("autors")
        .<Autor, AutorDenormalized>chunk(100)
        .reader(autorItemReader)
        .processor(processor)
        .writer(unitatPipeliningWriter)
        .build();
}

它工作得很好。

现在,我需要处理 bt“查克专用线程”。

我添加了这个配置:

@Bean
public Step autors(
    ItemReader<Autor> autorItemReader,
    AutorMappingItemProcessor processor,
    AutorPipeliningItemWriter unitatPipeliningWriter,
    TaskExecutor taskExecutor
) {
    return this.stepBuilderFactory
        .get("autors")
        .<Autor, AutorDenormalized>chunk(100)
        .reader(autorItemReader)
        .processor(processor)
        .writer(unitatPipeliningWriter)
        .taskExecutor(taskExecutor)
        .throttleLimit(4)
        .build();
}

@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

这里出现了问题,因为我收到了这些消息:

HikariPool-2 - Connection ConnectionID:1 ClientConnectionId: 8bee2b6e-d88e-4831-b9e5-163b52dca86c marked as broken because of SQLSTATE(08S01), ErrorCode(0)

com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.

或者

HikariPool-2 - Connection ConnectionID:1 ClientConnectionId: 8bee2b6e-d88e-4831-b9e5-163b52dca86c marked as broken because of SQLSTATE(08S01), ErrorCode(0)

com.microsoft.sqlserver.jdbc.SQLServerException: The TDS protocol stream is not valid.

我的相关ItemReader是:

@Bean
public ItemReader<Autor> autorReader() {
    String sql = "select * from ...";

    JdbcCursorItemReader<Autor> jdbcCursorItemReader = new JdbcCursorItemReader<>();
    jdbcCursorItemReader.setDataSource(this.dataSource);
    jdbcCursorItemReader.setSql(sql);
    jdbcCursorItemReader.setVerifyCursorPosition(false);
    jdbcCursorItemReader.setRowMapper(this.autorMapper);

    return jdbcCursorItemReader;
}

我的数据源是:

@Bean
@JobDataSource
@ConfigurationProperties(prefix = "spring.job-datasource")
public DataSource jobDataSource() {
    return DataSourceBuilder.create().build();
}

和属性:

spring.job-datasource.jdbcUrl=jdbc:sqlserver://localhost;databaseName=ac_img_p
spring.job-datasource.username=sa
spring.job-datasource.password=StR0nGp4ss.
spring.job-datasource.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver
spring.job-datasource.initialization-mode=always

有任何想法吗?

标签: spring-batchhikaricp

解决方案


JdbcCursorItemReader不是线程安全的,因为它扩展了AbstractItemCountingItemStreamItemReader不是线程安全的。所以在多线程步骤中使用它是不正确的。你可以做的是用一个装饰它SynchronizedItemStreamReader

@Bean
public SynchronizedItemStreamReader<Autor> autorReader() {
   String sql = "select * from ...";

   JdbcCursorItemReader<Autor> jdbcCursorItemReader = new JdbcCursorItemReader<>();
   jdbcCursorItemReader.setDataSource(this.dataSource);
   jdbcCursorItemReader.setSql(sql);
   jdbcCursorItemReader.setVerifyCursorPosition(false);
   jdbcCursorItemReader.setRowMapper(this.autorMapper);

   SynchronizedItemStreamReader<Autor> synchronizedReader = new SynchronizedItemStreamReader<>();
   synchronizedReader.setDelegate(jdbcCursorItemReader)
   return synchronizedReader;
}

否则,您可以使用像JdbcPagingItemReader.

附带说明一下,您的autorReader方法应该返回实际类型,或者至少ItemStreamReader<Autor>让 Spring Batch 正确代理您的阅读器。


推荐阅读