spring-batch - Spring批处理:多线程步骤错误配置
问题描述
这是我的步骤:
@Bean
public Step autors(
ItemReader<Autor> autorItemReader,
AutorMappingItemProcessor processor,
AutorPipeliningItemWriter unitatPipeliningWriter
) {
return this.stepBuilderFactory
.get("autors")
.<Autor, AutorDenormalized>chunk(100)
.reader(autorItemReader)
.processor(processor)
.writer(unitatPipeliningWriter)
.build();
}
它工作得很好。
现在,我需要处理 bt“查克专用线程”。
我添加了这个配置:
@Bean
public Step autors(
ItemReader<Autor> autorItemReader,
AutorMappingItemProcessor processor,
AutorPipeliningItemWriter unitatPipeliningWriter,
TaskExecutor taskExecutor
) {
return this.stepBuilderFactory
.get("autors")
.<Autor, AutorDenormalized>chunk(100)
.reader(autorItemReader)
.processor(processor)
.writer(unitatPipeliningWriter)
.taskExecutor(taskExecutor)
.throttleLimit(4)
.build();
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
这里出现了问题,因为我收到了这些消息:
HikariPool-2 - Connection ConnectionID:1 ClientConnectionId: 8bee2b6e-d88e-4831-b9e5-163b52dca86c marked as broken because of SQLSTATE(08S01), ErrorCode(0)
com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.
或者
HikariPool-2 - Connection ConnectionID:1 ClientConnectionId: 8bee2b6e-d88e-4831-b9e5-163b52dca86c marked as broken because of SQLSTATE(08S01), ErrorCode(0)
com.microsoft.sqlserver.jdbc.SQLServerException: The TDS protocol stream is not valid.
我的相关ItemReader
是:
@Bean
public ItemReader<Autor> autorReader() {
String sql = "select * from ...";
JdbcCursorItemReader<Autor> jdbcCursorItemReader = new JdbcCursorItemReader<>();
jdbcCursorItemReader.setDataSource(this.dataSource);
jdbcCursorItemReader.setSql(sql);
jdbcCursorItemReader.setVerifyCursorPosition(false);
jdbcCursorItemReader.setRowMapper(this.autorMapper);
return jdbcCursorItemReader;
}
我的数据源是:
@Bean
@JobDataSource
@ConfigurationProperties(prefix = "spring.job-datasource")
public DataSource jobDataSource() {
return DataSourceBuilder.create().build();
}
和属性:
spring.job-datasource.jdbcUrl=jdbc:sqlserver://localhost;databaseName=ac_img_p
spring.job-datasource.username=sa
spring.job-datasource.password=StR0nGp4ss.
spring.job-datasource.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver
spring.job-datasource.initialization-mode=always
有任何想法吗?
解决方案
JdbcCursorItemReader
不是线程安全的,因为它扩展了AbstractItemCountingItemStreamItemReader
不是线程安全的。所以在多线程步骤中使用它是不正确的。你可以做的是用一个装饰它SynchronizedItemStreamReader
:
@Bean
public SynchronizedItemStreamReader<Autor> autorReader() {
String sql = "select * from ...";
JdbcCursorItemReader<Autor> jdbcCursorItemReader = new JdbcCursorItemReader<>();
jdbcCursorItemReader.setDataSource(this.dataSource);
jdbcCursorItemReader.setSql(sql);
jdbcCursorItemReader.setVerifyCursorPosition(false);
jdbcCursorItemReader.setRowMapper(this.autorMapper);
SynchronizedItemStreamReader<Autor> synchronizedReader = new SynchronizedItemStreamReader<>();
synchronizedReader.setDelegate(jdbcCursorItemReader)
return synchronizedReader;
}
否则,您可以使用像JdbcPagingItemReader
.
附带说明一下,您的autorReader
方法应该返回实际类型,或者至少ItemStreamReader<Autor>
让 Spring Batch 正确代理您的阅读器。
推荐阅读
- ruby-on-rails - Rspec 挂在 Ruby 2.7.4 上,没有输出
- python - Tweepy 错误(AttributeError:'str' 对象没有属性'request')
- java - 尽管有 try-catch 块,但未报告的异常
- java - 将分数作为字符串输入并转换为 int
- google-cloud-storage - 使用 Python API 以最低权限从 Google Cloud Storage 读取数据
- css - 如何缩小 css 类之间重复的 css 属性代码?
- javascript - 在 next.js/next-auth.js 中使用凭据身份验证的正确方法
- python - 在Python中用一个字符替换字符串中的多个字符
- c - C 程序:memcpy 正在复制奇怪的数据
- java - 如何添加到 JFrame 3 JPanel:第一个 JPanel 占据框架的 90%,第二个在窄顶部,第三个是可以关闭的抽屉面板