首页 > 解决方案 > 为什么这些线程读取整个文件而不是 maxitemcounter?

问题描述

我正在使用 Spring Batch 多线程进行处理。我的进程收到一个大的 txt 文件(> 100k 行)。我希望每个线程处理文件的 X 行并使用信息进行 X 处理以获得时间。

我正在使用 FlatFileItemRead 类将初始行和最后一行传递给每个线程。我正在用 19k 行、3 个和 4 个线程进行测试,我看不出为什么第一个线程在正确的行开始和停止,但其余线程从正确的行开始但没有以正确的行结束,请阅读文件直到结束。

我已经阅读了很多关于这个主题和 FlatFileItemReader 类的问题,它不是线程安全的。我认为我的问题就在这里,但是我将 saveState 传递给 false 并指示每个线程的行。(见此

这些是 XML 配置文件和 RangePartition 文件(步骤 setTiempoInicial 和 stTiempoFinal 仅打印初始时间和总时间):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
    xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:util="http://www.springframework.org/schema/beans"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">

    <batch:job id="superTxtTest">

        <batch:step id="setTiempoInicial" next="validacionDePers">
            <batch:tasklet transaction-manager="transactionManager"
                start-limit="1">
                <batch:chunk reader="tiempoInicialReader" writer="tiempoInicialWriter"
                    commit-interval="1" skip-limit="1">
                    <batch:skippable-exception-classes>
                        <batch:include
                            class="com.testpartitionfile.batch.PersException" />
                    </batch:skippable-exception-classes>
                </batch:chunk>
            </batch:tasklet>
            <batch:listeners>
                <batch:listener ref="promotionListener" />
            </batch:listeners>
        </batch:step>

        <batch:step id="validacionDePers" next="setTiempoFinal">
            <partition step="validacionDePersSlave" partitioner="rangePartitioner">
                <handler grid-size="3" task-executor="taskExecutor" />
            </partition>
        </batch:step>

        <batch:step id="setTiempoFinal">
            <batch:tasklet transaction-manager="transactionManager"
                start-limit="1">
                <batch:chunk reader="tiempoFinalReader" writer="tiempoFinalWriter"
                    commit-interval="1" skip-limit="1">
                    <batch:skippable-exception-classes>
                        <batch:include
                            class="com.testpartitionfile.batch.PersException" />
                    </batch:skippable-exception-classes>
                </batch:chunk>
            </batch:tasklet>
        </batch:step>

    </batch:job>

    <batch:step id="validacionDePersSlave">
        <batch:tasklet transaction-manager="transactionManager"
            start-limit="1">
            <batch:chunk reader="tratamientoPersReader" writer="validacionPersWriter"
                commit-interval="1" skip-limit="1">
                <batch:skippable-exception-classes>
                    <batch:include
                        class="com.testpartitionfile.batch.PersException" />
                </batch:skippable-exception-classes>
            </batch:chunk>
        </batch:tasklet>
    </batch:step>

    <bean id="promotionListener"
        class="org.springframework.batch.core.listener.ExecutionContextPromotionListener">
        <property name="keys" value="tiempoInicial" />
    </bean>

    <bean id="validacionPersWriter"
        class="com.testpartitionfile.batch.writer.ValidacionPersWriter"
        scope="step">
        <property name="threadName" value="#{stepExecutionContext[name]}" />
    </bean>

    <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
    <bean id="rangePartitioner"
        class="com.testpartitionfile.batch.partitioner.RangePartitioner" />

    <bean id="tratamientoPersReader" class="org.springframework.batch.item.file.FlatFileItemReader"
        scope="step">
        <property name="resource"
            value="file:C:\Users\user\Desktop\testFile.txt" />
        <property name="encoding" value="utf8" />
        <property name="lineMapper" ref="ficheroPersMapper" />
        <property name="linesToSkip" value="#{stepExecutionContext[fromId]}" />
        <property name="maxItemCount" value="#{stepExecutionContext[toId]}" />
        <property name="saveState" value="false" />
    </bean>

    <bean id="ficheroPersMapper"
        class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
        <property name="fieldSetMapper" ref="ficheroPersSetMapper" />
        <property name="lineTokenizer">
            <bean
                class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                <property name="delimiter" value=";" />
                <property name="names"
                    value="a,b,c,d,e,f,g,h,i,j" />
            </bean>
        </property>
    </bean>

    <bean id="ficheroPersSetMapper"
        class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
        <property name="prototypeBeanName" value="fichero" />
    </bean>

    <bean id="fichero" class="com.testpartitionfile.batch.dto.Fichero"
        scope="prototype" />

</beans>

范围分区器:

public class RangePartitioner implements Partitioner {

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {

        Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();

        int range = 1;
        
        InputStream is = null;
        try {
            is = new BufferedInputStream(new FileInputStream("C:\\Users\\user\\Desktop\\testFile.txt"));
            byte[] c = new byte[1024];
            int count = 0;
            int readChars = 0;
            while ((readChars = is.read(c)) != -1) {
                for (int i = 0; i < readChars; ++i) {
                    if (c[i] == '\n') {
                        ++count;
                    }
                }
            }
            if (count != 0) {
                range = (count/gridSize);
            }
        } catch (Exception e){
            try {
                is.close();
            } catch (Exception e1) {
            }
        } 
        
        int fromId = 1;
        int toId = range;

        for (int i = 1; i <= gridSize; i++) {
            ExecutionContext value = new ExecutionContext();

            System.out.println("\nHilo : " + i);
            System.out.println("Index Inicial : " + fromId);
            System.out.println("Index Final : " + toId + "\n");

            value.putInt("fromId", fromId - 1);
            value.putInt("toId", toId);

            // give each thread a name, thread 1,2,3
            value.putString("name", "Hilo " + i);

            result.put("partition" + i, value);

            fromId = toId + 1;
            toId += range;
            if (i == gridSize - 1) toId = toId + 1;

        }

        return result;
    }

}

具有 3 个线程的初始日志:

Thread: 1
Initial Index: 1
Final Index: 6333


Thread: 2
Index Initial: 6334
Final Index: 12666


Thread: 3
Index Initial: 12667
Final Index: 19000 

情况1:

线程 1 读取到 6333。线程 2 读取到 19000,而不是读取到 12666,线程 3 读取到 19000。

案例二:

使用 4 个线程时,线程 2 和 3 的读数高达 19000。

为什么案例 1 中的线程 2 和案例 2 中的线程 2 和 3 从初始行(正确)读取到结尾(不正确)?

更新 25/07/2021

问题在:https ://github.com/spring-projects/spring-batch/issues/805

标签: javaspringmultithreadingspring-batch

解决方案


推荐阅读