首页 > 解决方案 > 使 Spring Batch ItemReader 可重新启动

问题描述

我正在处理的 spring Batch 程序正在从表中读取数据。它正在使用 'org.springframework.batch.item.database.JdbcCursorItemReader' itemReader 。早些时候的计划是更改表并添加一个 PROCESSED_INDICATOR 标志并使用状态“PENDING”预填充它。处理完记录后,编写器会将 PROCESSED_INDICATOR 标志的状态更新为“已处理”。这是为了支持可重启性。例如,如果批处理获取 100 万条记录并在 50 万条记录中死亡,那么当我重新启动批处理时;它应该从我离开的地方开始。但不幸的是,管理层没有批准这个解决方案。我正在研究使 itemreader 可重新启动的方法。根据 Spring 文档,“大多数 ItemReader 具有更复杂的重启逻辑。例如,JdbcCursorItemReader,

有没有人有任何实现 JdbcCursorItemReader 并将最后处理的行存储在游标中的自定义阅读器的示例示例。 https://docs.spring.io/spring-batch/trunk/reference/html/readersAndWriters.html

==完整的 XML 配置==

<import resource="classpath:/batch/utility/skip/batch_skip.xml" />
<import resource="classpath:/batch/config/context-postgres.xml" />
<import resource="classpath:/batch/config/oracle-database.xml" />

<context:property-placeholder
    location="classpath:/batch/jobs/TPF-1001-DD-01/TPF-1001-DD-01.properties" />
<bean id="gridSizePartitioner"
    class="com.tpf.partitioner.GridSizePartitioner" />

      <task:executor id="taskExecutor" pool-size="${pool.size}" />
<batch:job id="XYZJob" job-repository="jobRepository"
    restartable="true">

    <batch:step id="XYZSTEP">
        <batch:description>Convert TIF files to PDF</batch:description>
        <batch:partition partitioner="gridSizePartitioner">

            <batch:handler task-executor="taskExecutor"
                grid-size="${pool.size}" />
            <batch:step>
                <batch:tasklet allow-start-if-complete="true">
                    <batch:chunk commit-interval="${commit.interval}"
                        skip-limit="${job.skip.limit}">

                        <batch:reader>
                            <bean id="timeReader"
                                class="org.springframework.batch.item.database.JdbcCursorItemReader"
                                scope="step">
                                <property name="dataSource" ref="oracledataSource" />
                                <property name="sql">
                                    <value>                                     
                                    select TIME_ID as timesheetId,count(*),max(CREATION_DATETIME) as creationDateTime , ILN_NUMBER as ilnNumber
                                    from TS_FAKE_NAME
                                    where creation_datetime  >= '#{jobParameters['creation_start_date1']} 12.00.00.000000000 AM' 
                                    and creation_datetime &lt;  '#{jobParameters['creation_start_date2']} 11.59.59.999999999 PM' 
                                    and mod(time_id,${pool.size})=#{stepExecutionContext['partition.id']} 
                                    group by  time_id ,ILN_NUMBER                                   

                                    </value>
                                </property>
                                <property name="rowMapper">
                                    <bean
                                        class="org.springframework.jdbc.core.BeanPropertyRowMapper">
                                        <property name="mappedClass"
                                            value="com.tpf.model.Time" />
                                    </bean>
                                </property>
                            </bean>
                        </batch:reader>
                        <batch:processor>
                            <bean id="compositeItemProcessor"
                                class="org.springframework.batch.item.support.CompositeItemProcessor">
                                <property name="delegates">
                                    <list>
                                        <ref bean="timeProcessor" />
                                    </list>
                                </property>

                            </bean>
                        </batch:processor>


                        <batch:writer>
                            <bean id="compositeItemWriter"
                                class="org.springframework.batch.item.support.CompositeItemWriter">
                                <property name="delegates">
                                    <list>
                                        <ref bean="timeWriter" />
                                    </list>
                                </property>
                            </bean>
                        </batch:writer>
                        <batch:skippable-exception-classes>
                            <batch:include
                                class="com.utility.skip.BatchSkipException" />
                        </batch:skippable-exception-classes>
                        <batch:listeners>
                            <batch:listener ref="batchSkipListener" />
                        </batch:listeners>
                    </batch:chunk>
                </batch:tasklet>
            </batch:step>
        </batch:partition>
    </batch:step>
    <batch:validator>
        <bean
            class="org.springframework.batch.core.job.DefaultJobParametersValidator">
            <property name="requiredKeys">
                <list>
                    <value>batchRunNumber</value>
                    <value>creation_start_date1</value>
                    <value>creation_start_date2</value>
                </list>
            </property>
        </bean>
    </batch:validator>
</batch:job>



<bean id="timesheetWriter" class="com.tpf.writer.TimeWriter"
    scope="step">
    <property name="dataSource" ref="dataSource" />
</bean>

<bean id="timeProcessor"
    class="com.tpf.processor.TimeProcessor" scope="step">
    <property name="dataSource" ref="oracledataSource" />
</bean> 

标签: spring-batch

解决方案


有没有人有任何实现 JdbcCursorItemReader 并将最后处理的行存储在游标中的自定义阅读器的示例

这样JdbcCursorItemReader做,请参阅Javadoc,这是一个摘录:

ExecutionContext: The current row is returned as restart data,
and when restored from that same data, the cursor is opened and the current row
set to the value within the restart data.

因此,您不需要自定义阅读器。


推荐阅读