spring-batch - 使 Spring Batch ItemReader 可重新启动
问题描述
我正在处理的 spring Batch 程序正在从表中读取数据。它正在使用 'org.springframework.batch.item.database.JdbcCursorItemReader' itemReader 。早些时候的计划是更改表并添加一个 PROCESSED_INDICATOR 标志并使用状态“PENDING”预填充它。处理完记录后,编写器会将 PROCESSED_INDICATOR 标志的状态更新为“已处理”。这是为了支持可重启性。例如,如果批处理获取 100 万条记录并在 50 万条记录中死亡,那么当我重新启动批处理时;它应该从我离开的地方开始。但不幸的是,管理层没有批准这个解决方案。我正在研究使 itemreader 可重新启动的方法。根据 Spring 文档,“大多数 ItemReader 具有更复杂的重启逻辑。例如,JdbcCursorItemReader,
有没有人有任何实现 JdbcCursorItemReader 并将最后处理的行存储在游标中的自定义阅读器的示例示例。 https://docs.spring.io/spring-batch/trunk/reference/html/readersAndWriters.html
==完整的 XML 配置==
<import resource="classpath:/batch/utility/skip/batch_skip.xml" />
<import resource="classpath:/batch/config/context-postgres.xml" />
<import resource="classpath:/batch/config/oracle-database.xml" />
<context:property-placeholder
location="classpath:/batch/jobs/TPF-1001-DD-01/TPF-1001-DD-01.properties" />
<bean id="gridSizePartitioner"
class="com.tpf.partitioner.GridSizePartitioner" />
<task:executor id="taskExecutor" pool-size="${pool.size}" />
<batch:job id="XYZJob" job-repository="jobRepository"
restartable="true">
<batch:step id="XYZSTEP">
<batch:description>Convert TIF files to PDF</batch:description>
<batch:partition partitioner="gridSizePartitioner">
<batch:handler task-executor="taskExecutor"
grid-size="${pool.size}" />
<batch:step>
<batch:tasklet allow-start-if-complete="true">
<batch:chunk commit-interval="${commit.interval}"
skip-limit="${job.skip.limit}">
<batch:reader>
<bean id="timeReader"
class="org.springframework.batch.item.database.JdbcCursorItemReader"
scope="step">
<property name="dataSource" ref="oracledataSource" />
<property name="sql">
<value>
select TIME_ID as timesheetId,count(*),max(CREATION_DATETIME) as creationDateTime , ILN_NUMBER as ilnNumber
from TS_FAKE_NAME
where creation_datetime >= '#{jobParameters['creation_start_date1']} 12.00.00.000000000 AM'
and creation_datetime < '#{jobParameters['creation_start_date2']} 11.59.59.999999999 PM'
and mod(time_id,${pool.size})=#{stepExecutionContext['partition.id']}
group by time_id ,ILN_NUMBER
</value>
</property>
<property name="rowMapper">
<bean
class="org.springframework.jdbc.core.BeanPropertyRowMapper">
<property name="mappedClass"
value="com.tpf.model.Time" />
</bean>
</property>
</bean>
</batch:reader>
<batch:processor>
<bean id="compositeItemProcessor"
class="org.springframework.batch.item.support.CompositeItemProcessor">
<property name="delegates">
<list>
<ref bean="timeProcessor" />
</list>
</property>
</bean>
</batch:processor>
<batch:writer>
<bean id="compositeItemWriter"
class="org.springframework.batch.item.support.CompositeItemWriter">
<property name="delegates">
<list>
<ref bean="timeWriter" />
</list>
</property>
</bean>
</batch:writer>
<batch:skippable-exception-classes>
<batch:include
class="com.utility.skip.BatchSkipException" />
</batch:skippable-exception-classes>
<batch:listeners>
<batch:listener ref="batchSkipListener" />
</batch:listeners>
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:partition>
</batch:step>
<batch:validator>
<bean
class="org.springframework.batch.core.job.DefaultJobParametersValidator">
<property name="requiredKeys">
<list>
<value>batchRunNumber</value>
<value>creation_start_date1</value>
<value>creation_start_date2</value>
</list>
</property>
</bean>
</batch:validator>
</batch:job>
<bean id="timesheetWriter" class="com.tpf.writer.TimeWriter"
scope="step">
<property name="dataSource" ref="dataSource" />
</bean>
<bean id="timeProcessor"
class="com.tpf.processor.TimeProcessor" scope="step">
<property name="dataSource" ref="oracledataSource" />
</bean>
解决方案
有没有人有任何实现 JdbcCursorItemReader 并将最后处理的行存储在游标中的自定义阅读器的示例
这样JdbcCursorItemReader
做,请参阅Javadoc,这是一个摘录:
ExecutionContext: The current row is returned as restart data,
and when restored from that same data, the cursor is opened and the current row
set to the value within the restart data.
因此,您不需要自定义阅读器。
推荐阅读
- coldfusion - 在 Coldfusion 中为 Google 服务帐户创建 JWT
- javascript - Bootstrap 下拉菜单按钮未与 Leaflet 地图一起显示
- teradata - 希望使用 Housing_train 数据集来实践 Teradata Aster
- python - 无法安装编解码器也无法升级 pip
- java - 碰撞检测仅适用于墙的顶部 - Java
- javascript - 如何使用 processing.js 使贝塞尔点的动画更平滑/更慢
- reactjs - 将状态传递给模态(子组件)不会改变子组件内部的状态
- python - 编码一串数字(递归?)
- node.js - 同步 Promise 以初始化 MongoDB 数据库
- angular - Angular (v4) DatePipe 与 UTC 日期时间作为本地时间