首页 > 解决方案 > MultiResourceItemReader 将最后一个文件读取锁定,因此完成处理后无法移动

问题描述

关于为什么 MultiResourceItemReader 将最后一个文件锁定并且我无法使用 Move Tasklet 移动它的想法?移动完成了所有其他文件,但最后一个文件已读取。

IO 异常有:

java.nio.file.FileSystemException: C:\Users\UGDW\MyProjects\ngsa2\oab-outside-assets-batchlauncher\input\EQ_AcctData_4321_03292020.csv -> C:\Users\UGDW\MyProjects\ngsa2\oab-outside-assets-batchlauncher\output\EQ_AcctData_4321_03292020.csv_processed: The process cannot access the file because it is being used by another process.

批处理配置(精简):

<batch:job id="stockPlanAccountDataJob">
    <batch:step id="getFilesInInputDirectory" next="fileProcessing">
        <tasklet ref="getFilesInInputDirectoryTasklet"/>
    </batch:step>

    <batch:step id="fileProcessing" next="moveFilesToOuputDirectory">                         
        <tasklet>
            <chunk reader="stockPlanAccountDataFileReader" processor="stockPlanAccountDataProcessor" writer="stockPlanConsoleItemWriter" 
                    commit-interval="20" skip-limit="20">                   
                <batch:skippable-exception-classes>
                    <batch:include class="java.lang.Exception"/>  
                    <batch:exclude class="org.springframework.batch.item.file.FlatFileParseException"/>  
                </batch:skippable-exception-classes>                        
            </chunk>
        </tasklet>
    </batch:step>

    <batch:step id="moveFilesToOuputDirectory">
        <tasklet ref="stockPlanMoveFilesTasklet"/>
    </batch:step>       
</batch:job>

<bean id="getFilesInInputDirectoryTasklet" class="simplepeekandmulti.GetFilesInInputDirectoryTasklet" scope="step"/>

<bean id="stockPlanAccountDataFileReader" class="simplepeekandmulti.StockPlanAccountDataFileReader" scope="step">
    <property name="delegate" ref="preprocessorUsingPeekable"/>
</bean>  

<bean id="preprocessorUsingPeekable" class="org.springframework.batch.item.support.SingleItemPeekableItemReader" scope="step">
    <property name="delegate" ref="multiFileResourceReader"/>
</bean>      

<bean name="multiFileResourceReader" class="org.springframework.batch.item.file.MultiResourceItemReader" scope="step"> 
    <property name="resources" value="file:#{jobExecutionContext[filepattern]}" />
    <property name="delegate" ref="genericFlatFileReader" />
    <property name="strict" value="true" />
</bean>

<bean id="genericFlatFileReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
    <property name="lineMapper" ref="genericFileLineMapper"/>    
</bean>

<bean name="genericFileLineMapper" class="org.springframework.batch.item.file.mapping.PassThroughLineMapper" scope="step" />

<bean id="stockPlanAccountDataProcessor" class="simplepeekandmulti.StockPlanAccountDataProcessor" scope="step"/>

<bean id="stockPlanMoveFilesTasklet" class="simplepeekandmulti.StockPlanMoveFilesTasklet" scope="step"/>

读者(有愚蠢的逻辑):

package simplepeekandmulti;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.PeekableItemReader;

import simplepeekandmulti.StockPlanAccountData;
import simplepeekandmulti.StockPlanFileInputAccountData;

public class StockPlanAccountDataFileReader implements ItemReader<StockPlanFileInputAccountData> {

    private PeekableItemReader<String> delegate;
    private AtomicLong itemsRead = new AtomicLong(0L);
    private static final String PIPE = "|";
    private static final String PIPE_SPLIT = "\\|";
    private static final int NUM_RECORDS_PER_LINE = 6;

    public PeekableItemReader<String> getDelegate() {
        return delegate;
    }

    public void setDelegate(PeekableItemReader<String> delegate) {
        this.delegate = delegate;
    }

    @Override
    public StockPlanFileInputAccountData read() throws Exception {
        String currentLine = delegate.read();
        StockPlanFileInputAccountData inputData = new StockPlanFileInputAccountData();
        int recs = 0;
        List<String> errorList = new ArrayList<>();

        while (currentLine != null) {
            if (currentLine.contains(PIPE)) {
                recs++;
                setDetailLine(currentLine, inputData, recs, errorList);
            } else {
                errorList.add(currentLine);
            }
            if ((errorList.size() % 2) == 0) {
                return inputData;
            }
            itemsRead.incrementAndGet();
            currentLine = delegate.read();
        }
        return null;
    }

    private void setDetailLine(String inputLine, StockPlanFileInputAccountData inputData, 
            int numRecs, List<String> errorList) {

        String[] entry = inputLine.split(PIPE_SPLIT);
        if (entry.length == NUM_RECORDS_PER_LINE) {
            inputData.setDataRecordsPerFile(numRecs);
            StockPlanAccountData data = new StockPlanAccountData();
            data.setExternalClientId(entry[0]);
            data.setSSN(entry[1]);
            data.setExternalParticipantId(entry[2]);
            data.setFirstName(entry[3]);
            data.setLastName(entry[4]);
            data.setDateOfBitrth(entry[5]);
            inputData.addToDataList(data);
        } else {
            errorList.add("Detail Line Is Invalid, Does NOT have 6 columns, 5 pipes: " + inputLine);
        }
    }
}

处理器:

package simplepeekandmulti;

import java.util.ArrayList;
import java.util.List;


import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ItemProcessor;

import com.vanguard.inst.batch.oab.springboot.data.StockPlanFileInputAccountData;

public class StockPlanAccountDataProcessor implements ItemProcessor<StockPlanFileInputAccountData, StockPlanFileInputAccountData> {

    private StepExecution stepExecution;

    @BeforeStep
    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }

    public StockPlanFileInputAccountData process(StockPlanFileInputAccountData item) throws Exception {

        List<String> errorList = new ArrayList<>(0);

        if (errorList.isEmpty()) {
            return item;
        } else {
            //exchangeEmailService.sendEmail(fileName, errorList);
            return null;
        }
    }

}

作家:

package simplepeekandmulti;

import java.util.List;

import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;

import com.vanguard.inst.batch.oab.springboot.data.StockPlanFileInputAccountData;

@Component
public class StockConsoleOutputItemWriter implements ItemWriter<StockPlanFileInputAccountData> {

    @Override
    public void write(List<? extends StockPlanFileInputAccountData> arg0) throws Exception {
        // TODO Auto-generated method stub

    }
}

移动文件小任务(文件名硬编码):循环中的最后一个文件总是失败。

package simplepeekandmulti;

import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class StockPlanMoveFilesTasklet implements Tasklet {

    private static final String CLASS_NAME = StockPlanMoveFilesTasklet.class.getSimpleName();

    @Value("$simplepeekandmulti-{INPUT_DIR}")
    private String inputDir;

    @Value("$simplepeekandmulti-{OUTPUT_DIR}")
    private String outputDir;

    private static final String PROCESSED = "_processed";

    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {

        String[] fileList = {"EQ_AcctData_3210_03302020.csv", "EQ_AcctData_4321_03302020.csv"};

        try {
            for (String fileName : fileList) {
                Path pathFrom = FileSystems.getDefault().getPath(inputDir, fileName);
                Path pathTo = FileSystems.getDefault().getPath(outputDir, fileName + PROCESSED);
                Files.move(pathFrom, pathTo, StandardCopyOption.REPLACE_EXISTING);
            }
        } catch (IOException io) {
            System.out.println(io.toString());
        }
        return RepeatStatus.FINISHED;
    }
}

CSV 文件只有;页眉日期,管道分隔的记录,页脚总记录数

2020 年 3 月 30 日

3210|59658625|12000|AADFBCJGH|LLOQMNURS|1962-03-08

3210|10124602|12001|AADFBCJGH|LLOQMNURS|1962-03-08

2

2020 年 3 月 30 日

4321|5690154|13000|AADFBCJGH|LLOQMNURS|1988-10-23

4321|745701|13001|AADFBCJGH|LLOQMNURS|1988-10-23

2

标签: spring-batchfilelock

解决方案


推荐阅读